AQS简述

AQS即类AbstractQueuedSynchronizer,抽象队列同步器,指的是java构建锁和同步组件的基础框架。
JUC包中的 java.util.concurrent.locks.AbstractQueuedSynchronizer是一个抽象类,本身没有实现任何的同步接口,只是定义了同步状态,以及同步状态的获取和释放。
实际运用中通过继承它的方式来实现同步功能,一般是某种同步组件的静态内部类继承它,所以说aqs的实现类以组合的形式存在于同步组件,一同来实现同步功能。

AQS主要结构

AQS主要有以下的属性和方法,还有一个内部类Node

  1. 内部类Node, 它是一个双向链表节点,表征一个阻塞的线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
    final boolean isShared() {
    return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
    throw new NullPointerException();
    else
    return p;
    }

    Node() { // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) { // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
    this.waitStatus = waitStatus;
    this.thread = thread;
    }
    }
  2. 一个Head一个tail, 就是一个队列的头尾,也就是说AQS内部有一个阻塞线程的队列

    1
    2
    private transient volatile Node head;
    private transient volatile Node tail;
  3. 一个volatile的state, 表示同步的状态,0为可用,1为被锁,以及它的get/set

    1
    2
    3
    4
    5
    6
    7
    private volatile int state;
    protected final int getState() {
    return state;
    }
    protected final void setState(int newState) {
    state = newState;
    }
  4. 一个state的cas方法

    1
    2
    3
    protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

AQS类本身就这么点东西,重要的是它的几个实现以及相关的同步组件。
查看AQS的子类,可以看到就如下几个
image.png

JUC常用的同步组件及应用场景

上文中可以看到,AQS主要用于各个同步组件中实现同步功能。
这里把常用的一些异步组件过一遍,看看它们中的AQS实现以及本身的应用场景。

  • ReentrantLock

ReentrantLock,可重入锁,里面的静态内部类Sync继承了AQS, Sync本身还是个抽象类,然后FairSync, NonfairSync又分别继承了Sync,分别实现了公平锁和非公平锁,因此ReentrantLock有公平锁和非公平锁两种实现,通过构造方法传入的boolean可以区分,默认是非公平锁,因为效率较高。
ReentrantLock可以用在用synchronized的场景,做线程同步。由于jdk优化后,synchronized一样可以cas, 甚至于java都建议多用synchronized了,但是它的优点在于它是一套api,所以比synchronized灵活,比如以下情况:

  1. 和synchronized一样用来同步

    1
    2
    3
    4
    5
    6
    7
    8
    // 最土的用法
    private ReentrantLock lock = new ReentrantLock(true); //公平锁,或者非公平锁
    try {
    lock.lock(); //如果被其它资源锁定,会在此等
    //操作
    } finally {
    lock.unlock();
    }
  2. 多线程竞争,发现有人获得锁开始执行了,其它的就不执行了,或者等一会儿再看看

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // tryLock, 如果获得不了锁,整个就算了。
    ReentrantLock lock = new ReentrantLock();
    if (lock.tryLock()) { //尝试获取锁,或者用 lock.tryLock(5, TimeUnit.SECONDS),尝试不成功后再试一会儿
    try {
    //XXXXX操作
    } finally {
    lock.unlock();
    }

    }
  3. 可中断锁,获得锁开始操作,但是做一半可以中断的情况, 其它线程调用interrupt方法后,它抛出一个InterruptedException,就中断了

    1
    2
    3
    4
    5
    6
    7
    8
    9
    ReentrantLock lock = new ReentrantLock();
    try {
    lock.lockInterruptibly();// 获得一个可中断锁
    //xxxxx操作
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    lock.unlock();
    }
  • ReentrantReadWriteLock

和ReentrantLock一模一样,AQS在其中有公平锁和非公平锁两个实现。
ReentrantReadWriteLock和ReentrantLock的区别在于,ReentrantReadWriteLock里面维护了2个锁,一个read读锁,一个write写锁,为的是针对大多数情况读而少数写的情况,这种情况下读用的读锁是共享锁,写用的写锁是排他锁,而ReentrantLock只有一个排他锁。

  • Semaphore

同上,AQS在其中实现公平锁和非公平锁
Semaphore 信号量,一般来说锁只允许一个线程获得,而信号量允许多个线程同时获得资源,通过构造函数可以制定同时允许的线程数,因此它用于限制同时间能获取资源的线程数。
为了做到这点,Semaphore里的AQS的实现类Sync中的state,维护的是信号量个数
比如最常见的例子,银行有5个柜台能办理业务,但是有100个人等待办理,这100个人要是排队,就是公平锁,靠本事插队,就是非公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
Semaphore semaphore = new Semaphore(5, true); // 最多5个,公平锁
try {
semaphore.acquire(1);//获取一个许可
//do sth
return par1+"获取一个许可";
}catch (InterruptedException e){
e.printStackTrace();
return "获取许可失败";
}catch (Exception e1){
return "获取许可失败";
}finally {
semaphore.release(1);//处理完后,释放一个许可
}
  • CountDownLatch

CountDownLatch,从字面意思上看是计时门闩,门闩的地方负责阻塞线程,所以它用于需要一组线程等待彼此都到达门闩处的情况。
CountDownLatch只有一个构造方法,传入一个int, 也就是AQS里的state, 它表征拦截的线程数量,即AQS中阻塞队列的长度,同时也是倒计时的数字。
CountDownLatch的主要方法有
await() - 阻塞当前线程,计数至0之前一直等待,除被中断
countDown() - 计数器-1
曾经看到群里有人遇到的面试题,如何将异步处理的结果做聚合,例如多个线程去不同的地方取数据,然后做统一运算返回,这种情况就可以用CountDownLatch。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
CountDownLatch latch = new CountDownLatch(5); // 拦5个线程
for (int i=0; i<5; i++) { // 创建5个线程开始跑,每个睡5秒之后打印一句
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " 运行完了");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown(); // 跑完之后计数 -1, 当5个线程都跑完了,计数器就归0了
}
}
}).start();
}

latch.await(10, TimeUnit.SECONDS); // 把当前线程阻塞了,等其它线程跑完
System.out.println("主线程开始做事了!");
/*
输出结果
Thread-1 运行完了
Thread-5 运行完了
Thread-2 运行完了
Thread-3 运行完了
Thread-4 运行完了
主线程开始做事了!

可见主线程是等着其它线程都跑完了之后才开始干事的, 如果把代码中的CountDownLatch部分注释掉,就是这种结果了
主线程开始做事了!
Thread-3 运行完了
Thread-2 运行完了
Thread-4 运行完了
Thread-1 运行完了
Thread-5 运行完了
*/

  • CyclicBarrier

字面意思,可重复使用的栅栏。功能可以理解为可以一堆CountDownLatch连接起来,CountDownLatch只拦截一次,然后放行,CyclicBarrier可以循环地阻塞拦截。
可以说CyclicBarrier比CountDownLatch更强大。
不同的是,CyclicBarrier中的线程没有区分等人的和被等的,大家都是互相等,另外,不需要显示地去countDown.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
CyclicBarrier b = new CyclicBarrier(5); // 拦5个线程
for (int i=0; i<5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " 到达栅栏1开始等待");
b.await();
System.out.println(Thread.currentThread().getName() + " 完成工作1,到达栅栏2开始等待");
b.await();
System.out.println(Thread.currentThread().getName() + " 开始工作2");
} catch (Exception e) {
e.printStackTrace();
} finally {

}
}
}).start();
}

/*
结果:
Thread-1 到达栅栏1开始等待
Thread-3 到达栅栏1开始等待
Thread-2 到达栅栏1开始等待
Thread-4 到达栅栏1开始等待
Thread-5 到达栅栏1开始等待
Thread-5 完成工作1,到达栅栏2开始等待
Thread-1 完成工作1,到达栅栏2开始等待
Thread-2 完成工作1,到达栅栏2开始等待
Thread-3 完成工作1,到达栅栏2开始等待
Thread-4 完成工作1,到达栅栏2开始等待
Thread-4 开始工作2
Thread-5 开始工作2
Thread-1 开始工作2
Thread-3 开始工作2
Thread-2 开始工作2
*/

综述

AQS抽象队列同步器,定义了同步状态,阻塞线程队列,以及同步状态设置方法。
JUC包里的同步组件们组合各种AQS的实现类,实现同步的功能。
同步组件功能比synchronized关键字更加丰富和灵活,不同的同步组件可以应对不同的使用场景。

Donate
  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.

扫一扫,分享到微信

微信分享二维码

请我喝杯咖啡吧

微信