>  기사  >  Java  >  Java AQS의 구현 원리는 무엇입니까

Java AQS의 구현 원리는 무엇입니까

PHPz
PHPz앞으로
2023-04-23 18:34:08857검색

    Use

    여기서는 AQS의 구현 원리를 이해하기 위해 ReentrantLock을 사용합니다. ReentrantLock来搞清楚AQS的实现原理。

    lock

    这个方法就是开始获取锁运行的入口,在这个方法的实现中,交给了sync对象来获取锁。

    public void lock() {
        sync.acquire(1);
    }
    
    private final Sync sync;
    // Sync对象是一个ReentrantLock实现的内部抽象类,具体的实现又分为了公平版本与非公平两种
    abstract static class Sync extends AbstractQueuedSynchronizer {}
    
    // 在ReentrantLock的无参构造器中,默认使用的实现就是非公平锁的实现
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    // 也可以通过带参数的构造器来使用公平锁
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    Sync

    由于公平锁FairSyncNonfairSync的差别主要在tryAcquire方法上,别的逻辑都是相同的,因此我们就直接看Sync和AQS中的实现。

    acquire

    方法实现如下,来自AQS的实现:

    // 首先会调用 tryAcquire 和 acquireQueued 方法,如果2个方法都返回true的话,
    // 那么才会调用自行中断的逻辑
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();

    tryAcquire方法就会因为公平锁和非公平锁的差异,有2种不同的实现,首先来看看非公平锁的实现,也就是ReentrantLock的默认策略。

    NonfairSync.tryAcquire

    这个方法会直接调用并返回 Sync 实现的 nonfairTryAcquire(acquires)方法。

    Sync类中的实现

    // 这里的参数 acquires = 1
    final boolean nonfairTryAcquire(int acquires) {
        // 获取当前调用者的线程对象
        final Thread current = Thread.currentThread();
        // 获取AQS中定义的state值,这个state值是AQS的核心之一
        int c = getState();
        // 在ReentrantLock的实现中,state就表示当前是否有线程持有锁,0代表没有线程持有锁,
        // 当前访问的线程就可以继续执行代码,如果大于0则表示当前持有锁的线程的数量。
        // 由于ReentrantLock属于可重入锁,因此,这个值会>=1
        if (c == 0) {
            // 能进来就表示当前没有线程持有锁,那么尝试用CAS获取锁
            if (compareAndSetState(0, acquires)) {
                // 获取锁成功,那么将当前线程设置到AQS中的当前线程中
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果当前持有锁的就是自己,那么就代表是锁的重入
        else if (current == getExclusiveOwnerThread()) {
            // 累计持有锁的次数
            int nextc = c + acquires;
            // 这里就说明了,state能够设置的最大值就是Int.MAX_VALUE,
            // 当处于MAX_VALUE的时候再加1,那么Int数字的最高位就会变成1,符号位为负
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            // 更新新的state值
            setState(nextc);
            return true;
        }
        return false;
    }

    AQS中的实现

    private volatile int state;
    // 当前持有锁的线程对象
    private transient Thread exclusiveOwnerThread;
    
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    小结一下,非公平锁tryAcquire方法就是先看看有没有线程持有锁,没有的话自己就通过CAS的方式尝试获取一下锁,如果获取锁成功或者是自己重入,那么tryAcquire方法就会返回true,acquire方法中的条件判断就会直接返回false,lock方法结束,线程继续支持下面的代码。

    FairSync.tryAcquire

    下面来看看公平锁的实现,大体的逻辑跟非公平的是相同的。

    FairSync中的实现

    // 这里的参数 acquire = 1
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // 判断当前是不是有线程持有锁
        if (c == 0) {
            // 当前没有线程持有锁就进来
            // 由于是公平锁,那么就要保证只有在当前等待队列为空或者队列中等待的线程
            // 都没有到运行的条件的时候,才尝试通过CAS来获取锁。否则就去乖乖排队
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 同非公平锁,锁重入
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    AQS中的实现

    // 检查当前AQS等待队列中是否有正在等待的有效线程节点
    public final boolean hasQueuedPredecessors() {
        Node h, s;
        // 首先让参数h指向当前队列的头部
        if ((h = head) != null) {
            // 队列不为空
            // 将临时变量赋值为当前第二个节点
            // 这里需要简单说明一下AQS的等待队列的构成,第一个节点是没有业务含义的,
            // 只是用作唤醒下一个待执行的线程节点
            if ((s = h.next) == null || s.waitStatus > 0) {
                // 能进来就表示当前队列只有一个头结点,或者第二个节点的状态是已取消
                // - > 参考说明1
                // 如果第二个节点不为空,那么就释放这个引用
                s = null; // traverse in case of concurrent cancellation
                // 从后往前遍历,找到距离队列头最近的有效节点
                for (Node p = tail; p != h && p != null; p = p.prev) {
                    if (p.waitStatus <= 0)
                        s = p;
                }
            }
            // 如果找到了正在队列中的排队的有效节点并且不是当前访问的线程,那么就返回true
            if (s != null && s.thread != Thread.currentThread())
                return true;
        }
        // 头结点指向NULL,那么说明队列是空的,直接返回false
        return false;
    }

    说明1:这里就引入了队列节点中的等待状态这个重要的概念,在ReentrantLock中,我们只需要关注CANCELLEDSIGNAL即可。只有CANCELLED是大于0的,新节点的默认值为0。因此只要等待状态大于0就代表该节点被取消了。

    // 等待队列中节点的等待状态
    volatile int waitStatus;
    // 当前节点因为等待超时或者被中断了被取消
    static final int CANCELLED =  1;
    // 接下来有资格被唤醒获得锁的标记,只有获得了这个标记的节点才能被执行完的线程唤醒
    static final int SIGNAL    = -1;

    小结一下,公平锁对比非公平锁,在最开始有机会获取锁的时候,会先检查一下当前队列中是否已经有线程在排队等待执行了,如果等待队列中是空的或者没有有效的排队节点,才会获取锁。如果获取锁成功,或者锁重入成功,那么同样会结束AQS的逻辑,继续执行业务代码。

    acquireQueued

    上面分析完在tryAcquire方法中如果成功获得锁的情况,就会结束AQS的逻辑,接下来就来分析未能成功获得锁的逻辑,即:acquire方法中条件判断的第二个条件判断。

    在AQS中实现

    // 这个方法就会将当前线程添加到等待队列中,并且返回操作是否成功,arg就是传入的acquire值,为1
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

    首先通过addWaiter方法构建一个包含线程对象的节点并且添加到队列中

    // 这里的参数为 Node.EXCLUSIVE,表示这是一个排它锁的实现,这里的值为NULL
    private Node addWaiter(Node mode) {
        // 构建一个线程节点
        Node node = new Node(mode);
    
        // 这里就是AQS的核心理念了,通过不断的自旋,将线程节点插入到队列中
        for (;;) {
            // 获取原来的队列的队尾,因为AQS才去的尾插方式
            Node oldTail = tail;
            if (oldTail != null) {
                // 将新插入的节点指向原来的尾结点
                node.setPrevRelaxed(oldTail);
                // 通过CAS的方式将当前节点设置到线程共享队列的尾部去,这里要注意,
                // 凡是涉及到多线程操作的属性,都需要通过CAS保证操作的原子性
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    // 设置成功,就返回插入的节点对象;如若不成功,
                    // 就表示有别的线程也修改了尾结点,那么就要等下一次循环重试
                    return node;
                }
            } else {
                // 没有尾结点说明队列不存在,那么就进行初始化
                initializeSyncQueue();
            }
        }
    }
    
    // 初始化等待队列
    private final void initializeSyncQueue() {
        Node h;
        // 还是通过CAS的方式,给队列初始化一个默认的Node节点,几个重要的属性的初始值如下
        // waitStatus = 0; thread = null
        if (HEAD.compareAndSet(this, null, (h = new Node())))
            tail = h;
    }

    小结一下,addWaiter方法通过尾插的方式将没有抢到锁的线程封装为Node节点,插入到AQS的等待队列中。如果队列还未初始化,那么就先初始化队列,队列自带一个无实际含义的头结点。

    acquireQueued

    在AQS中实现

    // arg就是传入的acquire参数,为1;该方法返回值的含义为,线程在等待过程中是否被中断
    final boolean acquireQueued(final Node node, int arg) {
        boolean interrupted = false;
        try {
            // 还是不断的自旋,等待机会进行操作
            for (;;) {
                // 获取新插入节点的上一个节点
                final Node p = node.predecessor();
                // 如果上一个节点已经是头结点,那么说明当前就已经轮到当前线程获取锁执行业务了
                // 那么就再次尝试抢一次锁
                if (p == head && tryAcquire(arg)) {
                    // 能进来就说明获取锁成功了
                    // 那么就将当前线程的节点设置为头结点
                    // 在这个方法中,会去除节点中的信息,做一个纯粹的头结点
                    setHead(node);
                    // 将已经没有指向的原头结点的next指为空,等待回收
                    p.next = null; // help GC
                    // 这里返回的值为false,因为当前线程并未被阻塞就获得了锁
                    return interrupted;
                }
                // 走到这里说明当前线程并没有获取到锁,那么就要考虑是否要将线程阻塞了
                if (shouldParkAfterFailedAcquire(p, node))
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            if (interrupted)
                selfInterrupt();
            throw t;
        }
    }
    
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    
    // 线程获取锁失败之后,是否需要将线程阻塞,这里2个参数,
    // pred是新插入节点的上一个节点,node是新插入的节点
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取上一个节点的等待状态
        int ws = pred.waitStatus;
        // 判断上一个节点的状态是不是SIGNAL,只有状态为SIGNAL的才是有效的可指向节点
        if (ws == Node.SIGNAL)
            // 如果上一个节点是SIGNAL状态,那就说明当前线程可以连接上该节点,然后被挂起了
            return true;
        // 上面我们提到过,只有节点被取消了,等待状态才会>0
        if (ws > 0) {
            // 一直往前找,直到找到等待状态<=0的,数据规范的话即找到最近的一个等待状态
            // 为SIGNAL的节点,跳过全部被取消的节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            // 此时pred指向的即第一个合法的线程节点,指向当前新插入的节点
            pred.next = node;
        } else {
            // 这里的意思就是上一个节点就是有效节点,那么就将上一个节点的等待状态强制
            // 更新为SIGNAL,即-1。毫无意义的那个头结点也会被设置为SIGNAL状态
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        return false;
    }

    小结一下,这个方法的含义就是,由于当前线程没有获取到锁资源,因此就需要被阻塞。同时在这个方法中,也会整理等待队列,将那些已经被取消的节点从队列中移除。接着就是调用条件判断中的执行方法,将线程挂起阻塞起来。

    private final boolean parkAndCheckInterrupt() {
        // 调用了park方法,底层是调用UnSafe类的park方法来实现
        LockSupport.park(this);
        return Thread.interrupted();
    }

    至此,lock

    lock

    이 메서드는 잠금 작업을 획득하기 위한 진입점입니다. 이 메서드 구현에서는 잠금을 획득하기 위해 sync 개체가 넘겨집니다.

    public void unlock() {
        sync.release(1);
    }

    Sync

    공정한 잠금 FairSyncNonfairSync의 차이점은 주로 tryAcquire 메서드에 있으므로 다른 논리 동일하므로 Sync 및 AQS의 구현을 직접 살펴보겠습니다.

    acquire

    메서드는 AQS 구현에서 다음과 같이 구현됩니다.
    public final boolean release(int arg) {
        // 调用tryRelease方法真正实现解除锁
        // 只有state加的所有锁被解除了,那么才会唤醒下一个线程
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    tryAcquire 메서드에는 공정한 잠금과 불공정한 잠금의 차이로 인해 두 가지 다른 구현이 있습니다. 먼저 ReentrantLock의 기본 전략인 불공정 잠금 구현을 살펴보겠습니다.

    NonfairSync.tryAcquire

    이 메서드는 Sync에서 구현한 nonfairTryAcquire(acquires) 메서드를 직접 호출하고 반환합니다.

    Sync 클래스에서 구현

    // 这里的releases就是传入的参数1,即如果是重入锁,那么这里需要解锁多次
    protected final boolean tryRelease(int releases) {
        // 计算state的值,这里的含义就是state-1
        int c = getState() - releases;
        // 如果当前线程不是持有锁的线程,那么就报错
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        // 锁是否完全释放完的标记
        boolean free = false;
        // state减到0说明锁已经完全释放完了
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        // 更新state的值
        setState(c);
        // 只有锁被完全释放完,才返回true
        return free;
    }

    AQS에서 구현

    // 唤醒下一个需要锁的线程,这里的node是头结点
    private void unparkSuccessor(Node node) {
        // 获得头结点的等待状态
        int ws = node.waitStatus;
        // 如果头结点是SIGNAL,那么重置为0,因为这个节点已经没有意义了,会被移除
        if (ws < 0)
            node.compareAndSetWaitStatus(ws, 0);
        // 获得头结点后面的待唤醒的节点
        Node s = node.next;
        // 如果这个待唤醒的节点为空或者等待状态不正确,在这里就是不等于SIGNAL
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 从尾部开始查询,找到合法的待唤醒节点
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        if (s != null)
            // 唤醒线程
            LockSupport.unpark(s.thread);
    }

    요약하자면, 불공정 잠금 tryAcquire 방법은 먼저 잠금을 보유하는 스레드가 있는지 확인하는 것입니다. 직접 할 수도 있습니다. CAS를 통해 잠금을 획득해 보세요. 잠금이 성공적으로 획득되거나 다시 입력되면 tryAcquire 메서드가 true를 반환하고 에 조건부 판단이 적용됩니다. acquire 메서드는 false를 직접 반환하고 잠금 메서드는 종료되며 스레드는 다음 코드를 계속 지원합니다.

    FairSync.tryAcquire

    공정 잠금 구현을 살펴보겠습니다. 일반적인 논리는 불공정 잠금과 동일합니다.

    FairSync의 구현

    private void cancelAcquire(Node node) {
        // 健壮性判断,节点非空才可以被取消
        if (node == null)
            return;
    
        // 将节点中的线程指向去掉
        node.thread = null;
    
        // 获取到当前节点的上一个节点
        Node pred = node.prev;
        // 通过循环,跳过所有当前被取消节点之前的也已经被标记取消的节点
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
    
        // 通过上面的循环以后pred的值就为等待队列中队尾的一个合法的未被取消的节点
        // 获取到该合法节点下一个指向的节点
        Node predNext = pred.next;
    
        // 标记当前被取消的节点的等待状态为被取消
        node.waitStatus = Node.CANCELLED;
    
        // 如果被取消的节点就是队尾的节点,那么就通过CAS将pred设置为尾结点
        // 就可以抛弃中间那些同样被标记为被取消的节点,如果有的是
        if (node == tail && compareAndSetTail(node, pred)) {
            // 将pred的下一个节点指向NULL,因为pred现在就是队尾
            pred.compareAndSetNext(predNext, null);
        } else {
            // 这说明取消的节点不是尾结点,而是中间的节点
            // 这个值会在下面的条件判断被赋值为上一个节点的等待状态
            int ws;
            // 如果找到的上一个合法节点不是头结点
            // 并且上一个节点的等待状态是SIGNAL,并且将不是SIGNAL状态的负数状态转换为SIGNAL
            // 并且线程不为空
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
                pred.thread != null) {
                // 能进来就说明被取消的节点处于中间,那么就要将这个node从队列中跳过
                Node next = node.next;
                // 如果被取消的节点是一个有效的节点,不为空并且状态也是对的
                if (next != null && next.waitStatus <= 0)
                    // 那么就将上一个节点指向被取消节点的下一个节点
                    pred.compareAndSetNext(predNext, next);
            } else {
                // 能进入这里说明上一个合法节点已经是头结点了,
                // 那么就说明被取消的这个节点已经是原本除了头结点以外的最靠前面的节点,
                // 那么被取消的这个节点其实就等价于头结点了,应该唤醒后面还在等待的线程节点
                // 唤醒下一个被挂起的线程,具体已经分析过了,这里就省略了
                unparkSuccessor(node);
            }
    
            node.next = node;
        }
    }

    🎜Implementation in AQS🎜🎜rrreee🎜참고 1: 여기서는 대기열 노드의 대기 상태에 대한 중요한 개념이 ReentrantLock에 도입됩니다. 취소됨신호. CANCELLED만 0보다 크고 새 노드의 기본값은 0입니다. 따라서 대기 상태가 0보다 큰 경우 노드가 취소되었음을 의미합니다. 🎜rrreee🎜요약하자면, 공정한 잠금은 불공평한 잠금과 비교하여 잠금을 처음 획득할 기회가 있을 때 현재 대기열에 실행을 기다리는 스레드가 있는지 먼저 확인합니다. 대기 대기열이 비어 있거나 유효하지 않으면 대기열이 실행됩니다. 노드가 잠금을 획득합니다. 잠금이 성공적으로 획득되거나 잠금이 다시 입력되면 AQS 논리도 종료되고 비즈니스 코드가 계속 실행됩니다. 🎜🎜acquireQueued🎜🎜위의 상황을 분석한 후 tryAcquire 메서드에서 잠금을 성공적으로 획득하면 AQS의 논리가 종료됩니다. 다음으로 잠금 획득에 실패하는 논리를 분석해 보겠습니다. acquire 메소드의 조건부 판정 중 두 번째 조건부 판정입니다. 🎜🎜🎜AQS에서 구현됨🎜🎜rrreee🎜먼저 addWaiter 메서드를 통해 스레드 객체가 포함된 노드를 구성하고 이를 대기열에 추가합니다🎜rrreee🎜요약하자면 addWaiter method pass tail 삽입 방법은 잠금을 확보하지 못한 스레드를 Node 노드로 캡슐화하고 이를 AQS의 대기 대기열에 삽입합니다. 큐가 초기화되지 않은 경우 먼저 큐를 초기화하세요. 큐에는 실제 의미가 없는 헤드 노드가 함께 제공됩니다. 🎜🎜acquireQueued🎜🎜🎜AQS에 구현됨🎜🎜rrreee🎜요약하자면 이 방법의 의미는 현재 스레드가 잠금 리소스를 획득하지 못했기 때문에 차단해야 한다는 것입니다. 동시에 이 방법에서는 대기 중인 대기열도 정렬되고 취소된 노드는 대기열에서 제거됩니다. 다음 단계는 조건부 판단에서 실행 메서드를 호출하여 스레드를 일시 중지하고 차단하는 것입니다. 🎜rrreee🎜이 시점에서 lock 메서드의 모든 세부 사항은 명확합니다. 스레드가 잠금을 얻을 수 있으면 잠금 단계를 직접 종료합니다. 대기 큐에 진입하기 전에 아직 잠금을 획득할 기회가 있는 것으로 확인되면 다시 획득을 시도하며, 테일 삽입을 통해 대기 큐에 진입합니다. 스레드는 LockSupport.park 메서드를 호출하여 차단되고 깨어나기를 기다립니다. 🎜🎜unlock🎜🎜이 메소드를 적극적으로 호출하면 잠금 점유가 종료되었음을 의미합니다. 🎜🎜🎜ReentrantLock에서 구현됨🎜🎜rrreee🎜🎜AQS에서 구현🎜🎜rrreee🎜🎜Sync에서 구현🎜🎜rrreee🎜🎜AQS에서 구현🎜🎜rrreee🎜cancelAcquire🎜 🎜 이 방법은 잠금을 획득하기 위해 대기 중인 스레드를 취소하는 기능을 제공합니다. . 🎜🎜🎜 기능은 AQS 🎜🎜rrreee에서 구현됩니다.

    위 내용은 Java AQS의 구현 원리는 무엇입니까의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

    성명:
    이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제