淘先锋技术网

首页 1 2 3 4 5 6 7

在并发编程的大军中,有两个阵营,一个就是synchronized内置关键字实现同步的阵营,另外一个就是这次要说的采用AQS框架实现同步的阵营,可以去juc并发包里去看看这些类的实现方式。

那么什么是AQS呢?我理解的AQS其实就是维持了一个先进先出的FIFO队列,然后各种并发实现类继承AQS抽象类来实现同步操作,那么下面我们通过几个图来看一下AQS的机制

    //同步标识位
    private volatile int state;

    //获取标识位
    protected final int getState() {
        return state;
    }

    //放置标识位
    protected final void setState(int newState) {
        state = newState;
    }

    //通过CAS方式来变更同步标识
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

上边的state就是同步状态,也就是咱们通常说的锁的状态,可以通过setState,getState和CAS来获取和修改这个状态值,那么在多线程中就意味着在获取独占锁的时候,多个线程会同时通过CAS方式来尝试修改这个state状态,哪个线程能成功修改这个值,使之变为1,谁就成功的获取到了锁,并执行业务逻辑,其他线程进入FIFO队列等待,当获取锁的线程执行完毕后,会将这个state变为0,并唤醒队列中下一个等待的线程(如下图)。

AQS中的FIFO队列为一个双向链表,每个Node节点都代表了一个正在等待的线程,其中比较重要的参数就是waitStatus

** Marker to indicate a node is waiting in shared mode */
        //表示线程以共享的模式等待锁
        static final Node SHARED = new Node();
        //表示线程以独占的模式等待锁
        static final Node EXCLUSIVE = null;

        //表示线程的获锁请求已经“取消”
        static final int CANCELLED =  1;
        //线程可被唤醒状态(正常可被唤醒状态)
        static final int SIGNAL    = -1;
        //线程处于condition队列中状态,等待满足某种条件时可被唤醒
        static final int CONDITION = -2;
        //当线程处在“SHARED”模式时,该字段才会被使用
        static final int PROPAGATE = -3;

        //表示线程在队列中的状态,状态值为上边几种
        volatile int waitStatus;

现在来跟踪一下ReentrantLock中的lock和unlock源码方法,来看看AQS的流程是怎么样的

    private final Sync sync;

    //内部定义了一个Sync静态抽象类来继承AQS
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        //抽象了lock的方法接口
        abstract void lock();

        //非公平方式尝试获取锁
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        //释放锁
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
        //判断当前线程是否获取了独占锁
        protected final boolean isHeldExclusively() {
            
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        //定义了条件队列
        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        
        //获取持有独占锁的线程
        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }
        
        
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }
        
        //判断是否已经加锁
        final boolean isLocked() {
            return getState() != 0;
        }

    }

    //继承Sync的非公平队列
    static final class NonfairSync extends Sync {...}

    //继承Sync的公平队列
    static final class FairSync extends Sync {...}

上边是ReentrantLock的大概结构,就是一个Sync类实现了AQS抽象类,然后再衍生了两种模式,非公平和公平模式,ReentrantLock默认初始化就是非公平模式

 //调用ReentrantLock的lock方法,里边调用sync的获取锁方法
 (1)public void lock() {sync.lock();}

 //由上图源码可知,Sync的lock只是个抽象接口,需要再NonFairSync/FairSync中找实现方式
 (2)abstract void lock();

  //NonFairSync实现lock方式
 (3)final void lock() {
            //先使用CAS方式尝试一次修改state值(获取锁)
            if (compareAndSetState(0, 1))
                //若成功则设置当前线程为持有锁的线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                //独占方式获取同步状态
                acquire(1);
        }

 (4)
    public final void acquire(int arg) 
    {
        //如果再次尝试获取锁失败则将当前线程加入队列
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            
            selfInterrupt();
    }
    
    AQS中的模板方式,这是一个钩子,实现方式还是在ReentrangLock中
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    //NonFairSync中的实现方式
    protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

    final boolean nonfairTryAcquire(int acquires) {
            //获取当前线程
            final Thread current = Thread.currentThread();
            //获取当前state值
            int c = getState();
            //若为0,则说明当前锁已经被释放,不死心,想再尝试争夺一下锁
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //若当前线程为持有锁的线程,再次加锁(可重入锁的实现重点)
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                //实现可重入其实就是state+1,哈哈,但是在释放锁的时候,要确保最后state为0
                setState(nextc);
                return true;
            }
            return false;
        }

    将当前线程加入FIFO队列 
    private Node addWaiter(Node mode) {
        //创建一个node节点,把线程放到节点中,设置节点属性为独占
        Node node = new Node(Thread.currentThread(), mode);
        //从队列尾部放入节点
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //若上边不成功则以死循环方式放置node节点
        enq(node);
        return node;
    }

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            //节点以自旋的方式观察当前队列中的状态
            for (;;) {
                //获取当前节点的前置节点
                final Node p = node.predecessor();
                //若前置节点为头节点且成功获取到锁
                if (p == head && tryAcquire(arg)) {
                    //设置头节点为当前节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    //判断前置节点的状态是不是正常可被唤醒的状态
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //若前置节点是可被唤醒的状态,返回true
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            //若前置节点的状态是被取消的状态,则将前置节点丢出FIFO队列,然后将当前节点的前置指针指向它前面第一个waitStatus小于0的node节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //CAS方式尝试将前置节点的waitStatus设置为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    private final boolean parkAndCheckInterrupt() {
        //阻塞当前线程,等待被唤醒
        LockSupport.park(this);
        return Thread.interrupted();
    }

简单的讲一下Lock方法的非公平实现,意思就是尝试获取锁的时候,用CAS尝试修改state值,若不成功,AQS就会将当前线程包装成Node节点,然后扔进FIFO队列,并将当前线程线程用LockSupport.park方法阻塞住,同时维持一个自旋判断,直到当前线程能够成功获取到锁的时候,跳出自旋,同时有一些线程中断的判断,若当前线程中断了,也会跳出循环。那么如果跟踪源码可以发现,大多数主体方法是在AQS抽象类中完成的,只有tryAcquire方法是在具体实现类中实现,AQS只是提供了一个钩子用于调用子类中的实现方法,这就是AQS中重要的设计模式之一  ---模板模式

那么上边一系列的Lock实现只是非公平锁的实现,那么公平锁其实跟非公平锁的方式大体逻辑是一样的,区别在于非公平锁模式下线程会先去CAS尝试修改一次state值,如果失败了才会进入FIFO队列,那么公平模式就是直接进入FIFO队列去排队,所以非公平锁比公平锁效率高的地方就在于此,因为将线程维护到FIFO队列里也是吃CPU资源的,但是缺点是在极端情况下可能会造成线程饥饿的情况,就是FIFO队列中的线程一直处于阻塞状态下获取不到锁。

再来看一下Unlock在AQS中的实现方式,这里依然拿ReententLock举例

//调用AQS的释放锁方法,这个1就是state需要减去的值,加锁的时候加了1,释放的时候减1
public void unlock() {sync.release(1);}

//释放方法
public final boolean release(int arg) {
        //首先尝试释放锁,这里依然是模板方式的钩子,调用子类的实现方式
        if (tryRelease(arg)) {
            //找到头节点,其实还是当前这个线程
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }


protected final boolean tryRelease(int releases) {
            //获取当前线程的state值,减1
            int c = getState() - releases;
            //判断一下当前线程是不是持有锁的线程,如若不是,抛异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //如果state变成0了,当前线程本身就意味不需要再释放锁了,共享锁的时候,这里的state值就不一定是1了,需要循环释放锁,直到state变成0
            if (c == 0) {
                free = true;
                //将独占锁线程标识置空
                setExclusiveOwnerThread(null);
            }
             //设置新的state值,这里是0
            setState(c);
            //返回是否释放成功的标识
            return free;
        }

//唤醒头节点之后的下一个没有被取消的节点,那么如果头节点的下一个节点被取消了,则从队列尾部往前找最靠近头节点的那个没有被取消的节点,来执行unpark唤醒线程,重新去争夺锁
private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

其实这就是释放锁的逻辑,然后下一个线程如果争夺到了锁之后,就会将头节点指向获取锁的线程,上一个执行完锁的头节点对象此时就没有任何的引用了,等待GC回收就可以了。

如上两图就是在执行unlock时,FIFO队列中的节点情况

附录ReentrantLock加锁流程图和解锁流程图

加锁流程图

 解锁流程图