dcddc

西米大人的博客

0%

系统学习AQS

AQS概述

AbstractQueuedSynchronizer(AQS)是java并发编程基础,concurrent包下面的许多同步组件底层都是基于AQS,例如CountDownLatch、ReentrantLock、ReadWriteLock、Semaphare…

AQS通过一个FIFO队列(双向链表数据结构),管理被阻塞的线程,同时具备阻塞park和唤醒unpark线程的能力

  • park和unpark追进去发现是native方法,应该是通过底层操作系统接口来调度线程

AQS定义一个volatile int state,代表共享资源,同步组件实现AQS提供的模板方法,使用这个变量来实现多线程对共享资源(锁)的竞争,达到线程间同步的功能

  • 例如可重入锁用state表示同一线程对锁的重入次数,CDL用state表示签到阈值,达到阈值后唤醒所有等待CDL的线程

AQS与同步组件的关系图

源码分析

acquire方法

acquire是AQS提供的独占模式获取锁的入口

  • 首先执行tryAcquire方法(由具体的子类实现,判断线程是否能获取锁)。如果失败,表示该线程获取独占锁失败,如果成功,表示当前线程拿到了独占锁,可以继续执行加锁代码块。
  • 获取独占锁失败后,调用addWaiter方法,将当前线程加入到等待队列尾部
    • 这里用到了自旋CAS更新tail来保证并发时等待队列的线程安全,tail用volatile修饰保证可见性
  • 将队列中代表当前线程的node节点传递给acquireQueued方法,尝试再次获取独占锁。具体逻辑为:
    • 如果node节点的前驱节点是头结点,就再次尝试获取到锁(tryAcquire)
    • 如果获取锁成功,将该节点设置为头结点(等待线程一定是头结点的后继节点,头结点表示获得了锁的线程),方法return,当前线程执行加锁代码块。
      • 如果线程在阻塞过程中被中断,只是设置线程中断标,在线程拿到锁继续执行时,让程序自己决定如何处理中断
      • 注意,reentrantLock(AQS的同步组件)的lock方法不会在阻塞过程中抛出中断异常,所以一旦死锁无法通过中断来恢复线程。不过,tryLock(long timeout, TimeUnit unit)和lockInterruptibly方法都能使线程在阻塞过程中被中断时抛出中断异常
    • 如果获取失败,将当前节点的线程挂起
    • 自旋这一过程,保证一旦当前线程恢复执行时可以立即尝试获取锁
1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node enq(final Node node) {
for (;;) { //for循环,直到队列构造成功
Node t = tail;
if (t == null) { //第一次循环:队列为空,就构造一个节点设置为头结点
if (compareAndSetHead(new Node()))
tail = head;
} else {//第二次循环执行到这里 首先将node节点的前驱节点指向尾节点,然后CAS设置node节点为尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
// 自旋这一过程,保证一旦当前线程恢复执行时可以立即尝试获取锁
for (;;) {
final Node p = node.predecessor();
// 如果node节点的前驱节点是头结点,就再次尝试获取到锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
// 如果获取失败,将当前节点的线程挂起
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}

独占模式获取锁小结:

  • AQS的阻塞队列实际是双向链表实现的
  • tryAcquire直接拿到锁的线程,不会加入阻塞队列。否则,加入阻塞队列,线程阻塞
  • 当持有锁的线程调用tryRelease释放锁成功后,会唤醒阻塞队列Head节点的后继节点线程,让它再次尝试tryAcquire获取锁
  • 获取锁成功,则它成为Head节点,所以可以认为阻塞队列的Head节点代表的就是持有锁的线程

release方法

release是AQS提供的独占模式释放锁的入口。

  • 调用子类的tryRelease方法尝试释放独占锁
    • 如果失败,直接返回
    • 如果成功。唤醒(unpark)等待队列头节点的后继节点对应的线程
    • 唤醒后的线程会进入acquireQueued的自旋里尝试获取独占锁,参考上文
      1
      2
      3
      4
      5
      6
      7
      8
      9
      public final boolean release(int arg) {
      if (tryRelease(arg)) {
      Node h = head;
      if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);
      return true;
      }
      return false;
      }

独占模式释放锁小结:

  • 释放锁时,会唤醒AQS阻塞队列上的后继节点线程,让它tryAcquire尝试获取独占锁

acquireShared方法

acquireShared是AQS提供的共享模式获取锁的入口

  • 尝试获取共享锁
    • 如果成功,获取到共享锁,执行加锁代码块
    • 如果失败(小于0)
      • 构建一个共享节点添加到等待队列。
      • 当前节点是头结点的后继节点时,再次尝试获取共享锁。
        • 成功,在setHeadAndPropagate方法里做两件事。1、当前节点设为头节点。2、唤醒当前节点的后继节点。最后方法返回,线程执行加锁代码块。第2点是独占锁模式没有的,也是和独占锁模式的最大区别。因为独占锁任何时候只有一个线程获得锁,所以唤醒后继节点只有在当前线程释放锁时进行。但共享锁同时可以有多个线程获取锁,所以在获得共享锁后就可以唤醒后继节点。详细分析参考:https://segmentfault.com/a/1190000016447307
        • 如果获取失败,将当前节点的线程挂起。
        • 自旋这一过程,保证一旦当前线程恢复执行时可以立即尝试获取锁
1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
} finally {
if (interrupted)
selfInterrupt();
}
}

共享模式获取锁小结:

  • 和独占模式最大的区别就是,阻塞队列里的线程获取锁成功后,继续唤醒后继节点线程,因为共享模式锁是可以被多个线程共享的。独占模式没有这一步,只会在释放锁时唤醒后继节点线程,保证锁任何时候只被一个线程独占

releaseShared方法

releaseShared是AQS提供的共享模式释放锁的入口,和释放独占锁的逻辑一致,也是唤醒阻塞队列的后继节点线程

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

综上,AQS对独占锁模式和共享锁模式的实现方式,最大的区别就是独占锁模式只有在线程释放锁后才唤醒后继线程,共享锁模式在线程拿到共享锁和释放共享锁时,都会唤醒后继线程,这么做的效果是唤醒线程的操作会递归向下传播,可以同时有多个线程被唤醒,拿到共享锁执行加锁代码块。独占锁只在锁释放时唤醒下一个阻塞线程,保证任何时候锁只被一个线程独占

AQS的应用

一些同步组件内部通过赋予AQS的state属性不同含义,实现了各自的AQS,达到线程间同步的功能

ReentrantLock

ReentrantLock是可重入的独占锁,又细分为公平锁和非公平锁,默认非公平锁

1
2
3
4
5
6
7
public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

ReentrantLock中的state表示锁被重入次数,每次重入成功+1

tryAcquire

  • 如果state=0,说明独占锁未被占据
    • 对于公平锁,判断阻塞队列头结点是否存在后继节点且代表的线程不是当前线程,即是否有更早等待锁的其他线程
      • 如果没有,则state通过CAS置1,然后缓存当前线程作为独占线程,尝试获取独占锁成功
      • 如果有,获取独占锁失败,已经有其他线程更早地等待锁,因此只能走AQS流程入队阻塞,乖乖排队等待被唤醒。这就是公平性的实现,因为AQS是FIFO队列,先到先得
    • 对于非公平锁,直接CAS尝试改state状态到1,成功则获取到锁,失败说明被其他线程抢先CAS State成功,锁被抢占,只能走AQS流程入队阻塞,乖乖排队等待被唤醒。这就是非公平性的实现
  • 如果state不为0,说明独占锁已被其他线程占据,判断是否就是当前线程
    • 如果是,则更新state来标识已重入次数,成功则获取到可重入锁。这就是可重入性的实现,通过判断当前持有锁的线程是否就是自己来决定是否尝试获取锁成功
    • 如果不是,则无法获取锁,走AQS流程入队阻塞,等待被唤醒

因为state用volatile保证可见性,所以更新state无需加锁

公平锁tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

非公平锁tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

tryRelease

  • 因为可重入性,所以先state自减
    • 当state=0时,表示线程已经把可重入锁全部释放,则认为可以将独占锁释放,让下一个等待线程(阻塞队列头结点的后继节点)尝试获取锁
    • state != 0,说明还在重入中,不能释放锁
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      protected final boolean tryRelease(int releases) {
      int c = getState() - releases;
      if (Thread.currentThread() != getExclusiveOwnerThread())
      throw new IllegalMonitorStateException();
      boolean free = false;
      if (c == 0) {
      free = true;
      setExclusiveOwnerThread(null);
      }
      setState(c);
      return free;
      }

小结

  • 公平锁通过FIFO阻塞队列来实现,前面有阻塞的线程节点则只能乖乖去队尾排队。非公平锁尝试获取锁时,无需关心队列里有没有阻塞的线程节点,直接通过CAS修改state来尝试获取锁,成功就能直接抢到锁
  • 通过判断持有锁的线程是否为当前线程来实现可重入性,每次重入成功则state+1,通过state表示重入次数。释放锁时state-1,减到0则可以唤醒后继阻塞的线程

CountDownLatch

CountDownLatch是基于AQS共享锁的同步组件,功能类似签到器,state表示签到阈值,签到数达到阈值后,唤醒所有等待CDL的线程

CDL构造函数指定签到阈值,保存到AQS的state

1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

tryReleaseShared

线程执行完逻辑后,调用countDown方法签到,state减1,然后判断state是否减到0,如果是说明全部签到完成,释放共享锁,唤醒所有阻塞队列上的线程(共享锁的机制,上面介绍过,唤醒的线程会递归唤醒后继节点的线程)

1
2
3
public void countDown() {
sync.releaseShared(1);
}

底层调用AQS的释放共享锁方法,CDL对tryReleaseShared的实现,其实就是每次将state-1,如果state减到0(调用次数达到计数阈值),则认为可以释放共享锁,唤醒所有等待CDL共享锁的线程。

1
2
3
4
5
6
7
8
9
10
11
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

tryAcquireShared

线程调用await方法,将等待在签到器上,直到其他线程全部签到完成才能获取到CDL锁继续执行任务。CDL使用共享锁的原因是等待CDL锁的线程可能有多个,当CDL签到完成后,希望唤醒所有的线程,即所有线程都获取到CDL锁

tryAcquireShared尝试获取共享锁的条件是state为0,即全部签到完成,如果不为0,说明还没全部签到,当前线程走AQS流程加入阻塞队列等待CDL锁

1
2
3
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

1
2
3
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}