博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ReentrantReadWriteLock源码解析
阅读量:4209 次
发布时间:2019-05-26

本文共 9398 字,大约阅读时间需要 31 分钟。

之前我们学习了ReentrantLock锁的大概实现机制,其中的大boss是AQS,最终我们得出的结论是,AQS维护了锁的绝大多数操作。我们只需要使用它提供的方法即可实现锁的功能。那么ReentrantReadWriteLock(读写锁)是如何实现的呐,按照之前的说法,ReentrantReadWriteLock应该也是借助AQS来做吧,毕竟人家提供了那么多方法,不用白不用么。那么现在我们想一下如果让咋去设计一个读写锁,应该怎么设计?首先我们需要知道什么是读写锁。读写锁解决的什么问题?

我们知道在加锁的时候会导致线程的阻塞。也就是我们的多核CPU只能单核跑,其他的CPU只能排队。因此从CPU层面上说加锁的代价是不可忽视的。而锁的意义则是对代码块进行单核CPU处理,但是我们注意到在我们写代码的时候,有时候其实我们知道在什么条件是可以多线程跑的,什么时候是不可以的。那么问题就是如何让我们的代码在条件语句下控制自己的单线程和多线程的决策问题。

那么读锁的定义大概可以描述为:让副本代码块可以在不同的CPU上运行。写锁的定义就是:副本代码只能通过竞争选择唯一的一个进行运行。

但是鄙人任然不理解读锁存在的理由,因为我们可以让多线程去跑,然后让每个线程在加了读锁的指令位置去判断是否有写锁的标志,如果有写锁的标志那么就阻塞等待。直到写锁释放为止。假设一个线程获取了写锁,但是在写锁的代码块中去调用运行被其他线程加了写锁的代码。那么必然会发生死锁情况。突然感觉自己跑偏了。。。咋不是在讨论读锁存在的理由么,为啥讨论到了死锁,,,ZZ;好了,咋还是重新来,如如果一个线程在正常运行,然后获取写锁的线程修改了已经被运行的代码中的数据,那么就会产生脏数据。所以说咋面对这种问题咋还不得不加个锁,就是说读锁。加了读锁的线程写锁是不能操作的。加了写锁之后,只有获得写锁的线程可以操作。

获得了读锁,那么如何升级为写锁?java中的读写锁是不允许这种操作的。为啥?这个就比较简单了,因为多个线程获取的读锁,然后都知道了数据值,然后一个线程升级为写锁然后操作了一波,那么你让人家其他线程如何信任你。。那么世界就乱了。

获得了写锁,那么如何降级为读锁?在java中是可以这么操作的,在拥有写锁之后,然后要变成读锁需要先获取读锁,获取读锁之后才能释放写锁。为什么不能直接释放写锁,然后去申请读锁呐?因为在你释放之后又可能被其他线程获取到了写锁。

通过查看ReentrantReadWriteLock类的体系结构,我们看到主要有Syn、NofairSyn、FairSyn、readlock和writeLock。其中Syn继承了AQS,是线程同步队列。也是终极大boss,nofairSyn和公平锁FairSyn是两种不同的锁机制。主要在于唤醒机制不同。ReentrantReadWriteLock的类结构如

其中方法基本都是操作Syn来进行的。因此解析Syn成为我们必须要面对的问题。

从源码中我们可以看到Sync继承了传说中的AQS,也就是AQS提供的线程排序加入等待都是采用的AQS的。

//读写锁采用32位作为读写锁,将读锁和写锁放到一个整形变量里。为啥要这么设计?在之前学习重入锁ReenTrantLock的时候我们知道锁维护的是一个state的整形变量,那么读写锁是否是为了分开读写锁而将state进行了分开处理,其本质也是为了复用?假如用两个整形进行维护会有什么问题?好像没有什么问题,但是我们利用state变量的可能会有一点问题,所以我估计这里采用32位的高低不同位数就是这个原因。        static final int SHARED_SHIFT   = 16;        //左移16位        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);        //对大的锁个数        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;        //独占锁的最大个数        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;        //通过state变量的右移16为获取读锁个数        /** Returns the number of shared holds represented in count  */        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }        //通过state与最大的独占锁取与,获取独占锁的个数        /** Returns the number of exclusive holds represented in count  */        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

上述代码主要用来记录锁的个数。通过把原来的state的分开使用达到原来的独占锁的用途扩展。

由于读写锁肯定代码量比较多,而且逻辑性特别强,所以我还是决定先从开始的时候进行起步。我们在使用读写锁的时候,一般是lock方法。那么我们就看lock和unlock方法。于是我将目标类定位到了readLock类。

我们看到调用了同步类sync的申请共享锁的方法。

public final void acquireShared(int arg) {        if (tryAcquireShared(arg) < 0)            doAcquireShared(arg);    }

  首先调用try方法,但是当我们打开tryAcquireShared方法的时候,发现什么也没有。那么按照子类和父类的关系,那么该方法的实现必然在子类中。

protected final int tryAcquireShared(int unused) {             //获取当前线程            Thread current = Thread.currentThread();            //获取当前线程的锁状态            int c = getState();            //判断独占锁是否存在,如果已经被独占了,那么直接返回-1,然其调用doAcquireShared,那么doAcquireShared必然就是一个尝试的+排队的方法            if (exclusiveCount(c) != 0 &&                getExclusiveOwnerThread() != current)                return -1;                //获取读锁的数量            int r = sharedCount(c);            //该读锁是否应该被阻塞,感觉这里应该与写锁的判断有关,然后读锁的数量要小于最大的锁数量。这些条件成立之后将读锁CAS设置到state中。            if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {                if (r == 0) {                     //如果是第一个读锁线程。                    firstReader = current;                    //不懂这个holdCount难道只是为了记录第一次读锁的数量?有什么意识里?一脸黑人问号。有什么目前的格局搞不定的东西存在吗?                    firstReaderHoldCount = 1;                } else if (firstReader == current) {                    //这里想都不用想是重入的情况啊                    //好吧,上边的ReadHoldCount就是为了记录重入锁的次数啊,之前ReenTrantLock记得好像是直接用的一个变量与state进行加减法操作。现在要分开,那么read锁和write锁必然都有这样一个变量。然后他们的逻辑是不一样的                    firstReaderHoldCount++;                } else {                //从缓存中获取获取锁的数量?这个缓存又是什么鬼?下边的这些逻辑是怎么得?                //这个counter好像也没写什么啊,呃呃呃,,突然发现有真么一个类,看下边ThreadLocalHoldCounter,这个类继承了ThreadLocal,那么意思这块还是跟非第一个读线程的重入有关系啊                    HoldCounter rh = cachedHoldCounter;                    if (rh == null || rh.tid != getThreadId(current))                        cachedHoldCounter = rh = readHolds.get();                    else if (rh.count == 0)                        readHolds.set(rh);                    rh.count++;                }                return 1;            }            //当排不上档次的时候,就开始胡弄了(特殊的通道没有了,,,)。            return fullTryAcquireShared(current);        }

       

     

static final class ThreadLocalHoldCounter            extends ThreadLocal
{ public HoldCounter initialValue() { return new HoldCounter(); } }

   

//复杂情况下尝试获取读锁        final int fullTryAcquireShared(Thread current) {            HoldCounter rh = null;            //一顿自旋到怀疑人生            for (;;) {                //获取锁状态                int c = getState();                //如果已经有独占锁了                if (exclusiveCount(c) != 0) {                    //如果独占锁不是我自己,那么返回-1,让去调用doAcquiredShared方法,排队去吧你。这里好像执行两次这样的判断,why?                    if (getExclusiveOwnerThread() != current)                        return -1;                  //没有独占锁那么判断是否被其他线程占用,这里好像又进行了一次判断。真的是一步也不能松懈啊                } else if (readerShouldBlock()) {                                            if (firstReader == current) {                                            } else {                        //如果不是当前线程,那就去排对吧                        if (rh == null) {                            rh = cachedHoldCounter;                            if (rh == null || rh.tid != getThreadId(current)) {                                rh = readHolds.get();                                if (rh.count == 0)                                    readHolds.remove();                            }                        }                        if (rh.count == 0)                            return -1;                    }                }                //上边线判断是否有独占锁,然后判断是否被其他线程占用。既没有占用然后也不应该阻塞。那么就是它自己了。                //判断是否超出了容量                if (sharedCount(c) == MAX_COUNT)                    throw new Error("Maximum lock count exceeded");                //cas进行state变量的修改                if (compareAndSetState(c, c + SHARED_UNIT)) {                    if (sharedCount(c) == 0) {                        firstReader = current;                        firstReaderHoldCount = 1;                    } else if (firstReader == current) {                        firstReaderHoldCount++;                    } else {                        if (rh == null)                            rh = cachedHoldCounter;                        if (rh == null || rh.tid != getThreadId(current))                            rh = readHolds.get();                        else if (rh.count == 0)                            readHolds.set(rh);                        rh.count++;                        cachedHoldCounter = rh; // cache for release                    }                    //终于是它自己执行了。。。                    return 1;                }            }        }

代码读到这里,就有一点疑问了。我们这里只是用那个cache来做重入的次数统计,好像并没有用唉。cas的时候好像也直接操作的是state啊

怀着疑问,我们继续挺进那个doAcquireShared,通过查看源码,发现和我们之前看过的doAcquire没啥子区别。回头看看那个cache,感觉这个cache应该是用来记录非第一个读线程重入次数。因为读锁是公开的,每个线程都可以重入。但是每个线程最后都得释放,你也不能把别人的读重入释放掉,那么就用cache做个记录吧。这个解释很完美。我觉得人家就是考虑这个才这么设计的。至于为啥要把要把第一个读线程特殊对待?zz,我不知道。或许我后边会知道的。需要一点时间

   

/**     * Acquires in shared uninterruptible mode.     * @param arg the acquire argument     */    private void doAcquireShared(int arg) {        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        try {            boolean interrupted = false;            for (;;) {                final Node p = node.predecessor();                if (p == head) {                    int r = tryAcquireShared(arg);                    if (r >= 0) {                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        if (interrupted)                            selfInterrupt();                        failed = false;                        return;                    }                }                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {            if (failed)                cancelAcquire(node);        }    }

解析了加锁的过程,那么解锁的过程就是state的-1操作了。可是我觉得我们有必要从另一个角度来看看这个cache咋用的。所以我们直接去看unlock的cache的代码吧!

       

protected final boolean tryReleaseShared(int unused) {            Thread current = Thread.currentThread();            if (firstReader == current) {                //如果当前线程是第一个读线程                if (firstReaderHoldCount == 1)                    //而且第一个读线程的重入次数为1,那么直接释放吧                    firstReader = null;                else                    //第一个读线程的重入进行释放                    firstReaderHoldCount--;            } else {                //如果不是第一个读线程,那么从cahce中获取,然后释放它的重入锁                HoldCounter rh = cachedHoldCounter;                if (rh == null || rh.tid != getThreadId(current))                    rh = readHolds.get();                int count = rh.count;                if (count <= 1) {                    readHolds.remove();                    if (count <= 0)                        throw unmatchedUnlockException();                }                --rh.count;            }            for (;;) {                int c = getState();                int nextc = c - SHARED_UNIT;                if (compareAndSetState(c, nextc))                    //锁释放完毕之后将状态设置到state中。                    return nextc == 0;            }        }

 通过阅读释放读锁的代码,发现我们的想法没有什么问题。

转载地址:http://xmkmi.baihongyu.com/

你可能感兴趣的文章
PHP在变量前面加&是什么意思?
查看>>
ebay api - GetUserDisputes 函数
查看>>
ebay api GetMyMessages 函数
查看>>
php加速器 - zendopcache
查看>>
手动12 - 安装php加速器 Zend OPcache
查看>>
set theme -yii2
查看>>
yii2 - 模块(modules)的view 映射到theme里面
查看>>
yii2 - controller
查看>>
yii2 - 增加actions
查看>>
php图像处理函数大全(缩放、剪裁、缩放、翻转、旋转、透明、锐化的实例总结)
查看>>
magento url中 uenc 一坨编码 base64
查看>>
强大的jQuery焦点图无缝滚动走马灯特效插件cxScroll
查看>>
Yii2.0 数据库查询
查看>>
yii2 db 操作
查看>>
mongodb group 有条件的过滤组合个数。
查看>>
关于mongodb的 数组分组 array group
查看>>
MongoDB新的数据统计框架介绍
查看>>
mongodb 增加全文检索索引
查看>>
symfony
查看>>
mysql数据库主从同步的问题解决方法
查看>>