aepbui
发布于 2024-08-25 / 146 阅读
0
0

AQS

概述

AbstractQueuedSynchronizer抽象的队列同步器,用来构建锁或者其他同步器组建的基础框架,通过内置的CLH队列来完成资源获取线程的排队功能(为暂时还没有抢到锁的线程管理进入队列中,这个队列就是AQS的抽象表现),通过int类型的state来表示持有锁的状态,使用CAS对该同步状态进行原子操作实现对其值的修改

只要内部有使用了该类,说明该类使用到了AQS,如Semaphore、CountDownLatch、CyclicBarrier、ReentrantReadWriteLock、ReentrantLock等等,是JUC包的基石,统一规范并简化了锁的实现,屏蔽了同步状态管理、阻塞线程排队通知、唤醒机制等

CLH队列

将暂时获取不到锁的线程加入到该队列中,AQS将每个共享资源的线程封装成一个个节点Node,一个节点代表一个线程,它保存着线程的引用thread, 节点的状态waitStatus,前指针prev和后指针next

前置内容

学习AQS需要简单了解一下CAS和LockSuport,以下是这2个内容简单的概述

CAS可以参考这里

LockSuport

LockSupport提供了一种更底层和灵活的线程调度方式,不依赖于同步块或特定的锁对象,可以用于构建更复杂的同步结构,例如自定义锁或并发容器.park与unpark的组合使得线程之间的精确控制变得更容易,而不需要复杂的同步逻辑和对象监视

使用synchronized

synchronized关键字支持通过Object的wait和notify进行等待和唤醒,但是这种方式无法脱离synchronized代码块,并且notify和wait严格顺序限制

// Object objectLock = new Object();
new Thread(() -> {
    synchronized (objectLock) {
        try {
            objectLock.wait(); // 如果没有先被阻塞无法被唤醒
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}).start();
new Thread(() -> {
    synchronized (objectLock) {
        lock.notify();
    }
}).start();

使用Condition

Condition同样,进行await和signal不可以脱离lock和unlock使用,并且await和signal也有严格执行顺序,必须一致否则也会导致死锁

// Lock lock = new ReentrantLock();
// Condition condition = lock.newCondition();
new Thread(() -> {
    lock.lock();
    try {
        condition.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
}).start();
new Thread(() -> {
    lock.lock();
    try {
        condition.signal();
    } finally {
        lock.unlock();
    }
}).start();

所以得出结论synchronized和Condition也就是使用方式不同,但是底层逻辑大致相同

LockSupport优势

LockSupport解决了上述2个问题,它提供了Permit许可的概念来进行阻塞和唤醒,每个线程都有一个许可,Permit只有0和1两种情况,默认为0

线程阻塞需要消耗Permit,最多只有1个

当调用park方法时候

  • 如果有Permit,则直接消耗掉这个Permit然后正常执行

  • 如果没有Permit,则必须阻塞直到获得Permit

当调用unpark则会获得一个Permit

不需要锁块,不需要锁对象,即使先unpark也不会影响线程恢复

Thread thread = new Thread(() -> {
    ThreadUtil.sleep(5000);
    LockSupport.park();
});
thread.start();
Thread thread2 = new Thread(() -> {
    LockSupport.unpark(thread);
});
thread2.start();

AQS源码

通过state字段判断是否阻塞,加入CLH队列从尾部入队,从头部出队

// 尾指针
private transient volatile Node tail;
// 头指针
private transient volatile Node head;
// 当资源的同步状态
private volatile int state;

CLH队列中的node节点,waitStatus节点状态

  • 0 初始化的默认值

  • 1 线程获取锁的请求被取消了

  • -2 节点在等待队列中,等待被唤醒

  • -3 当前状态处于SHARED时才会被使用

  • -1 线程已准备好,等待资源被释放

// AQS队列中的节点
static final class Node {
  static final Node SHARED = new Node();
  // 当前Node线程状态
  volatile int waitStatus;
  // 前指针
  volatile Node prev;
  // 后指针
  volatile Node next;
  // 等待线程
  volatile Thread thread;
}

队列同步器

Lock接口的实现类基本通过聚合了一个队列同步器的子类来完成线程的访问控制

例如调用ReentrantLock的lock方法,实际上是调用Sync的lock方法,Sync是ReentrantLock的内部抽象类,它继承了AbstractQueuedSynchronizer,并且Sync被公平FairSync和非公平NonfairSync两个实现类继承

public void lock() {
    sync.lock();
}

通过构造方法来确定是公平锁还是非公平锁

public ReentrantLock() {
    sync = new NonfairSync();
}

此处使用非公平锁来举例

第一个线程执行

state的值为0,直接加锁

final void lock() {
    // 加锁前,通过CAS先判断锁是否被抢占,并将state设置为1
    if (compareAndSetState(0, 1))
        // 没有被占用,将当前占用该锁的线程设置为当前线程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 如果第二个线程访问,锁没有被释放
        acquire(1);
}

setExclusiveOwnerThread调用的是AQS的父类AbstractOwnableSynchronizer

private transient Thread exclusiveOwnerThread;
// 当第一个线程访问时将当前独占所有者线程设置为当前线程
protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
    return exclusiveOwnerThread;
}

线程1直接占用锁

第二个线程阻塞

acquire获取锁

依次执行tryAcquire、addWaiter、acquireQueued

public final void acquire(int arg) { // arg 为1
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire尝试获取锁

发现锁被占用返回false

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
​
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState(); // c的值为1
    if (c == 0) {
        // ...省略
    }
    else if (current == getExclusiveOwnerThread()) { // 被线程1占用
        // ...省略
    }
    return false;
}
​
protected final int getState() {
    return state;
}

一般情况下来说,第2个线程访问直接返回false,但是也有2种特殊情况

再次尝试获取锁.如果线程1刚刚执行完成释放锁,线程2立刻就调用了getState,这个时候getState为0.此时可以再次获取锁

if (c == 0) {
    if (compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
    }
}

可重入锁的实现,当线程1获取了锁以后,再次执行到这里尝试获取锁.这种情况也可以直接获取到锁

else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0)
        throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
}

addWaiter添加等待者,执行排队逻辑

如果tryAcquire返回false,则继续执行addWaiter(Node.EXCLUSIVE)

如果pred != null则说明队列已经初始化过,只需要将新加入的node添加到尾部即可

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

如果pred == null 则说明未曾入队过,需要执行队列初始化

private Node enq(final Node node) { // 形参node是存储着第二个线程的节点
    for (;;) {
        Node t = tail;
        if (t == null) { // 初始化队列部分
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t; // 将节点入队部分
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

将以上代码拆开

第一次循环执行的代码

进入队列,如果队列为空,首先初始化一个节点作为占位符(哨兵节点),或者也可以理解为未曾入队的线程1

如果是哨兵节点入队,则头尾节点都指向它

该链表中,第一个节点是哨兵节点,只占位并不存储任何信息,真正存储数据的节点从第二个节点开始

Node t = tail;
if (t == null) { // 如果队列中没有尾指针,则创建一个作为头节点
    if (compareAndSetHead(new Node())) // 设置头节点
        tail = head; // 设置尾节点
}

第二次循环

else {
    node.prev = t; // 形参node的前指针指向哨兵节点
    if (compareAndSetTail(t, node)) { // 尾指针指向形参node
        t.next = node; // 哨兵节点的后指针指向形参node
        return t;
    }
}

进行排队

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor(); // 哨兵节点
            if (p == head && tryAcquire(arg)) { // 哨兵节点是头节点但是线程1占用了锁
                // ...省略
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

获取前置节点,此处获取的是哨兵节点

final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}

获取锁失败后需要阻塞

每进入队列,之后的队列都会将前队列的waitStatus设置为-1,也就是说除了最后加入的节点为0其他的节点waitStatus都为-1

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; // 哨兵节点是0
    if (ws == Node.SIGNAL) // 0 != -1
        return true;
    if (ws > 0) {
        // ...省略
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 将前节点的waitStatus设置为-1
    }
    return false;
}

哨兵节点的waitStatus为-1,第二次循环shouldParkAfterFailedAcquire执行返回true,继续执行parkAndCheckInterrupt

if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;

此时线程2以及之后的线程执行到此处,终于处于阻塞状态

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // this是线程2,线程阻塞在这一行
    return Thread.interrupted();
}

如果还有第3、第4线程进入,以上执行流程一致

第一个线程解锁

线程1执行了lock.unlock()

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

尝试解锁,当前锁被线程1占用并且aqs的state为1

protected final boolean tryRelease(int releases) {
    int c = getState() - releases; // c = 1 - 1
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null); // 设置当前的占用线程为null
    }
    setState(c); // 将aqs的state设置为0
    return free;
}

尝试释放锁成功,继续执行

public final boolean release(int arg) {
    if (tryRelease(arg)) { // true
        Node h = head; // 哨兵节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

选择继任者

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus; // 哨兵节点的waitStatus为-1 
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0); // 将哨兵节点的waitStatus设置为0
    Node s = node.next; // 哨兵节点的下一个节点是线程2
    if (s == null || s.waitStatus > 0) { // 线程2的waitStatus为0
        // ...省略
    }
    if (s != null)
        LockSupport.unpark(s.thread); // 解锁
}

第二个线程恢复执行

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted(); // 查看当前线程是否中断,并重置中断标记
}

/*
public static boolean interrupted() {
    return currentThread().isInterrupted(true);
}

private native boolean isInterrupted(boolean ClearInterrupted);
*/

线程2占用锁,并将aqs的stats设置为1,占用头节点

此时线程2节点变成哨兵节点,原哨兵节点出队

持有锁的线程是不会在队列中的

恢复线程执行会调用parkAndCheckInterrupt,如果线程中断了将interrupted标记为true

当一个节点执行unlock以后,该节点的下一个节点(也就是继任者)会苏醒,获取锁成功以后会将该节点设置为头节点,并且将中断标记返回

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) { // 线程2获取锁
                setHead(node);
                p.next = null;
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) // 恢复执行后继续执行for循环
                interrupted = true; // 如果线程中断了标记
        }
    } finally {
        if (failed) // failed大部分情况下为false不继续执行
            cancelAcquire(node);
    }
}

// 将线程2设置为头节点
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

如果当前线程中断继续执行selfInterrupt

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// 因为之前调用interrupted导致清除了重置标记,通过selfInterrupt再设置回去
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

深入探究

线程中断

线程恢复后,会先通过parkAndCheckInterrupt判断当前线程是否阻塞过,并将线程的阻塞标记重置,并将中断标记记录在局部变量interrupted,如果被中断了再通过selfInterrupt将标记设置回去

那为什么不直接调用isInterrupted?

因为如果线程的中断状态如果为true,那么park将无法阻塞.然而acquireQueued方法中的线程会不断的重复阻塞、唤醒,所以必须要将中断标志消耗掉,否则线程将永远无法阻塞

具体参考这篇博客

public static void main(String[] args) throws InterruptedException {
    Thread.currentThread().interrupt();
    LockSupport.park();
    LockSupport.park();
    LockSupport.park(); // 通过测试发现main线程连续park3次都无法阻塞执行
}

由此可见lock方法并不关心线程是否被中断,可以尝试使用lockInterruptibly

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted()) // 线程中断直接抛出异常
        throw new InterruptedException();
    if (!tryAcquire(arg)) // 其余代码和acquire类似
        doAcquireInterruptibly(arg);
}

acquireQueued在代码执行,如果当前节点是头节点并且成功获取到锁failed就会设置为false

为了代码的健壮性当failed为默认值true会执行cancelAcquire,例如子类继承AQS并重写了tryAcquire、predecessor获取的节点为null、内存溢出等等情况

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null;
                failed = false; // 执行到这里线程获取到锁failed设置为false
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed) 
            cancelAcquire(node); // 代码出现异常,failed没有设置为false
    }
}

取消在aqs中排队的node

取消节点是尾节点

private void cancelAcquire(Node node) {
    if (node == null)
        return;
    node.thread = null; // 将该节点的thread设置为null
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    Node predNext = pred.next; // 获取prev的下一个节点
    node.waitStatus = Node.CANCELLED; // 将该节点设置为取消
	// ... 省略
}

协同取消,当前节点被取消了,顺便把其他的取消节点一并处理了

Node pred = node.prev; // 获取上一个节点
while (pred.waitStatus > 0) { // 跳过取消节点
    pred = pred.prev; // 获取上pred的上一个(上一个节点的上一个)
	node.prev = pred; // 设置为当前节点的上一个
}

脱离AQS队列

此时AQS的头尾节点分别指向sentinel和thread2,thread3和thread4这两个节点已经没有被指针指向,已经不可达,下次GC就会被回收

如果取消的节点是尾节点

将tail从当前节点替换为pred并将,predNext指向null

if (node == tail && compareAndSetTail(node, pred)) {
    compareAndSetNext(pred, predNext, null);
} else {
    // ... 省略
}

取消节点不是不是尾节点或者cas获取设置失败,继续执行

if中的一大堆判断就是为了保证前节点是有效节点,可以被唤醒

int ws;
// 不是头节点的后继节点,并且节点状态不是-1
if (pred != head &&
    ((ws = pred.waitStatus) == Node.SIGNAL ||
	// 如果不是-1,并且状态不是取消则设置为-1
     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
	// 只有头节点和取消节点的线程为null
    pred.thread != null) {
    Node next = node.next;
    if (next != null && next.waitStatus <= 0)
        compareAndSetNext(pred, predNext, next);
} else {
	// 当前节点是head节点的后继节点 
    unparkSuccessor(node);
}
// 下一个节点指向自己
node.next = node;

这个时候如果在执行shouldParkAfterFailedAcquire,取消的节点才会从队列中彻底断开

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // ... 省略
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;  // 跳过掉取消的节点,直到找到一个<=0的节点
        } while (pred.waitStatus > 0); 
        pred.next = node;
    } else {
        // ... 省略
    }
    return false;
}


评论