Single_Yam 阅读(13) 评论(0)

Synchronized 关键字结合对象的监视器,JVM 为我们提供了一种『内置锁』的语义,这种锁很简便,不需要我们关心加锁和释放锁的过程,我们只需要告诉虚拟机哪些代码块需要加锁即可,其他的细节会由编译器和虚拟机自己实现。

可以将我们的『内置锁』理解为是 JVM 的一种内置特性, 所以一个很显著的问题就是,它不支持某些高级功能的定制,比如说,我想要这个锁支持公平竞争,我想要根据不同的条件将线程阻塞在不同的队列上,我想要支持定时竞争锁,超时返回,我还想让被阻塞的线程能够响应中断请求,等等等等。

这些特殊的需求是『内置锁』满足不了的,所以在 JDK 层面又引入了『显式锁』的概念,不再由 JVM 来负责加锁和释放锁,这两个动作释放给我们程序来做,程序层面难免复杂了些,但锁灵活性提高了,可以支持更多定制功能,但要求你对锁具有更深层次的理解。

Lock 显式锁

Lock 接口位于 java.util.concurrent.locks 包下,基本定义如下:

public interface Lock {
    //获取锁,失败则阻塞
    void lock();
    //响应中断式获取锁
    void lockInterruptibly()
    //尝试一次获取锁,成功返回true,失败返回false,不会阻塞
    boolean tryLock();
    //定时尝试
    boolean tryLock(long time, TimeUnit unit)
    //释放锁
    void unlock();
    //创建一个条件队列
    Condition newCondition();
}

Lock 定义了显式锁应该具有的最基本的方法,各个子类的实现应该具有更加复杂的能力,整个 Lock 的框架如下:

image

其中,显式锁的实现类主要有三个,ReentrantLock 是其最主要的实现类,ReadLock 和 WriteLock 是 ReentrantReadWriteLock 内部定义的两个内部类,他们继承自 Lock 并实现了其定义的所有方法,精细化读写分离。而 ReentrantReadWriteLock 向外提供读锁写锁。

至于 LockSupport,它提供了阻塞和唤醒一个线程的能力,当然内部也是通过 Unsafe 类继而调用操作系统底层的 API 来实现的。

AbstractQueuedSynchronizer 你可以叫它队列同步器,也可以简称它为 AQS,它是我们实现锁的一个核心,本质上就是个同步机制,记录当前占有锁的线程,每一个想要获取锁的线程都需要通过这个同步机制来判断自己是否具备占有该锁的条件,如果不具备则阻塞等待,否则将占有锁,修改标志,这一点我们后续会详细分析。

ReentrantLock 的基本理解

ReentrantLock 作为 Lock 显式锁的最基本实现,也是使用最频繁的一个锁实现类。它提供了两个构造函数,用于支持公平竞争锁。

public ReentrantLock()

public ReentrantLock(boolean fair)

默认无参的构造函数表示启用非公平锁,当然也可以通过第二个构造函数传入 fair 参数值为 true 指明启用公平锁。

公平锁和非公平锁的区别之处在于,公平锁在选择下一个占有锁的线程时,参考先到先得原则,等待时间越长的线程将具有更高的优先级。而非公平锁则无视这种原则。

两种策略各有利弊,公平策略可以保证每个线程都公平的竞争到锁,但是维护公平算法本身也是一种资源消耗,每一次锁请求的线程都直接被挂在队列的尾部,而只有队列头部的线程有资格使用锁,后面的都得排队。

那么假设这么一种情况,A 获得锁正在运行,B 尝试获得锁失败被阻塞,此时 C 也尝试获得锁,失败而阻塞,虽然 C 只需要很短运行时间,它依然需要等待 B 执行结束才有机会获得锁来运行。

非公平锁的前提下,A 执行结束,找到队列首部的 B 线程,开始上下文切换,假如此时的 C 过来竞争锁,非公平策略前提下,C 是可以获得锁的,并假设它迅速的执行结束了,当 B 线程被切换回来之后再去获取锁也不会有什么问题,结果是,C 线程在 B 线程的上下文切换过程中执行结束。显然,非公平策略下 CPU 的吞吐量是提高的。

但是,非公平策略的锁可能会造成某些线程饥饿,始终得不到运行,各有利弊,适时取舍。庆幸的是,我们的显式锁支持两种模式的切换选择。稍后我们将分析其中实现的细节之处。

ReentrantLock 中有以下三个内部类是比较重要的:

image

内部类 Sync 继承自我们的 AQS 并重写了部分方法,NonfairSync 和 FairSync 是 Sync 的两个子类,分别对应公平锁和非公平锁。

为什么这么做呢?

image

类 Sync 中有一个 lock 方法,而公平策略下的 lock 方法和非公平策略下的 lock 方法应该具有不同的实现,所以这里并没有写死,而是交由子类去实现它。

这其实是一种典型的设计模式,『模板方法』。

关于 AQS,我们稍后做详细的分析,这里你把它理解为一个用于记录保存当前占有锁线程信息和阻塞在该锁上所有线程信息的容器即可。

接着看 ReentrantLock,你会发现,无论是 lock 方法,lockInterruptibly 方法、tryLock 或是 unlock 方法都是透传调用 sync 的相关方法,也即 AQS 中的相关方法。

下面我们就深入源码去分析分析这个 AQS 的实现情况。

AQS 的基本原理

AQS 就是我们的 AbstractQueuedSynchronizer,你可以把它理解为一个容器,它是一个抽象类,有一个父类 AbstractOwnableSynchronizer。这个父类的职责很简单,有一个 Thread 类型的成员属性,就是用来给 AQS 保存当前占有锁的线程的。

除此之外,AQS 中还定义了一个静态内部类 Node,是一个双向链表的数据结构。AQS 中自然还对应两个指针,队列头指针,尾指针。

int 类型的属性 state 也是一个十分重要的成员,值为零表示当前锁无任何线程持有,值为一说明有一个线程持有该锁未释放,大于一说明持有该锁的线程多次重入。

AQS 中定义了很多的方法,有公共的,有私有的,这里不一一赘述,我们从 ReentrantLock 的 lock 和 unlock 入手,分析它一路调用的方法,以非公平锁为例。

public void lock() {
    sync.lock();
}

ReentrantLock 的 lock 方法直接调用的 sync 的 lock 方法,而我们说过 sync 中定义的 lock 方法是一个抽象方法,具体实现在子类中,NonfairSync 的 lock 方法实现如下:

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

逻辑很简单,尝试使用 CAS 更新 state 的值为 1,表示当前线程尝试占有该锁,如果成功,说明 state 的值原本是一,也即锁无任何线程占用,于是将当前线程保存到父类的 Thread 字段中。

如果更新失败,那么说明锁已经被持有,需要挂起当前线程,于是调用 acquire 方法(AQS中的方法)。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire 被子类 Sync 重写了,所以这里调用的是 NonfairSync 的 tryAcquire 方法。

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
    }
    return false;
}

这段代码并不复杂是,主要逻辑是,如果 state 为零,说明刚才占有锁的线程释放了锁资源,于是尝试占有锁,否则判断一下占有锁的线程是否是当前线程,也就是判断一下是否是重入锁操作,如果是则增加重入次数即可。

关于返回值,如果是占有锁成功或者重入锁成功都将返回 true,否则统一返回 false。

接着看,

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

如果 tryAcquire 方法返回 true,外层 acquire 返回并结束 lock 方法的调用,否则说明占有锁失败并准备将当前线程阻塞,具体的阻塞情况我们继续分析。

addWaiter 方法用于将当前线程包装成一个 Node 结点并添加到队列的尾部,我们看看源代码:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

代码比较简单,不再啰嗦了,这个方法最终会导致当前线程挂在等待队列的尾部。

添加到等待队列之后会回到 acquireQueued 方法,这个方法会做最后一次尝试获取锁,如果依然失败则调用 LockSupport 方法挂起该线程。

image

整个方法的核心逻辑被写在了死循环之中,循环体的前半部分再一次尝试获取锁,这里需要注意,head 指向的结点并不是队列中有效的等待线程,head 的 next 指针指向的结点才是第一个有效的等待线程。

也就是说,如果那个结点的前驱结点是 head,那么它就是锁的第一有效继承人。

如果依然失败了,会先调用 shouldParkAfterFailedAcquire 判断是否应该阻塞当前线程,这个方法在大部分情况下会返回 true,在某些特殊情况下会返回 false。

然后 parkAndCheckInterrupt 将直接阻塞当前线程,调用 LockSupport 的 park 方法。整个获取锁的过程基本上就算结束了,接着我们如何解除阻塞。

public void unlock() {
    sync.release(1);
}

unlock 调用 AQS 的 release 方法,

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

先调用 tryRelease 尝试性释放,

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

如果当前线程不是拥有锁的线程,那么直接抛出异常,这是必要的异常点判断。

如果 c 等于零,说明自己并没有多次重入该锁,清空 exclusiveOwnerThread 字段即可,并修改 state 状态。这段代码没有加同步逻辑的原因是,unlock 方法只能由占有锁的线程进行调用,同一时刻只会有一个线程能够调用成功。

假如 c 不等于零,也就是当前线程多次重入该锁,state 虽然会被减一修改,而 tryRelease 却会返回 false,这一点需要注意。我们再回到 release 方法,

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

可以看到,如果由于线程的多次重入 tryRelease 返回 false 了,最终导致的是我们的 unlock 方法返回 false。

换句话说,你重入多少次锁,你就需要手动调用多少次 unlock,而只有最后一次的 unlock 方法返回的是 true,这就是原理。

而假如我们的 tryRelease 调用成功并返回 true,unparkSuccessor 方法就会去 unpark 我们的队列首个有效的结点所对应的线程。unparkSuccessor 比较简单,不涉及任何同步机制,这里不再赘述了。

总的来说,unlock 要比 lock 简单很多,原因在于,unlock 不需要同步机制,只有获得锁的线程才能够调用,不存在并发访问,而 lock 方法则不一样,会面临大量线程同时访问。

我们回到 acquireQueued 方法,

image

线程被唤醒后,会从上一次被阻塞的位置起重新开始执行代码,也就是线程会苏醒于 parkAndCheckInterrupt 方法中,

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);  //这里开始苏醒
    return Thread.interrupted();
}

第一件事情,调用 interrupted 方法,而这个方法用于判断当前线程在阻塞期间是否被中断。

如果遭遇中断,会进入 if 判断体,记录一下,用于方法返回。被唤醒的线程将重新从循环体首部开始,再次尝试去竞争锁,直到位于等待队列中自己之前的全部结点全部出队后,才能有机会获取到锁并返回中断标志。

所以来说,在死循环中阻塞一个线程是我们一种较为常见的阻塞模式,目的就是为了方便它被唤醒之后能够有机会重新竞争相关的锁资源。

以上,我们完成了对 ReentrantLock 这种独占式锁的加锁和释放锁的相关原理的一个介绍,关于读写分离的 ReentrantReadWriteLock 锁,它其实是共享锁和独占锁的一个结合,相对更加复杂,我们下一篇单独来分析。

除此之外的 ReentrantLock 中其他相关的一些响应中断的获取锁方法,支持超时返回的相关方法等,无一例外的依赖我们上述介绍的原理,相信大家有能力自行参透。

好了,本篇文章到此结束,大家期待下篇文章吧。