澳门新葡亰娱乐网站-www.142net-欢迎您

澳门新葡亰娱乐网站是因为你还没有找到一条正确的致富之路,www.142net是将所有的游戏都汇集在一起的官方平台,因为澳门新葡亰娱乐网站这个网站当中有着大量的游戏攻略,托IP定位技术,传达终端直接到达的精准传播方式。

澳门葡京真人娱乐:JDK源码分析,并发包同步工

来源:http://www.bhtsgq.com 作者:计算机知识 人气:66 发布时间:2019-05-30
摘要:前言 Exchanger应该算并签发承包合约辽宁中华南理哲高校程集团具使用相对少的,因为它至关心器重要用来线程之间沟通数据,它的用法比较轻便在不一致线程之间使用exchange方法调换数

前言

Exchanger应该算并签发承包合约辽宁中华南理哲高校程集团具使用相对少的,因为它至关心器重要用来线程之间沟通数据,它的用法比较轻便在不一致线程之间使用exchange方法调换数据,不过当中贯彻比相当漂亮妙,使用了unsafe的CAS原子操作、自旋来化解冲突难题,上边大家因此源码一探毕竟。

前言

CountDownLatch是三个密闭达成,它能够使一个依然三个线程等待1组事件时有产生。它包蕴二个计数器,用来代表需求等待的风浪数量,coutDown方法用于表示3个风浪时有发生,计数器随之递减,而await方法等待计数器为0事先平素不通。它是根据AQS的共享锁来达成的,个中使用了较多的AQS的主意,所以在那在此以前最佳读书过AQS的源码,能够查看本身此前AQS的源码深入分析,有些AQS方法未有在前头剖析过的这里涉及到了会议及展览开深入分析。

前言

此次分析连续信号量Semaphore,为何称之为能量信号量呢?是因为它能够垄断(monopoly)同有的时候间做客有个别能源的操作数量或然同期施行某些钦点操作的数目。就好比它像3个租借小车的营业所,租借公司的汽车的数额是定位的,用完必要偿还,用事先须要去租赁(acquire 前提是还有可用的汽车),假如小车都被租出去了,那只能等到人家归还了技术租到。它是依照AQS的共享锁来兑现的,当中使用了较多的AQS的艺术,所以在那前边最棒内需分析过AQS的源码,也能够查阅本人在此之前AQS的源码解析澳门葡京真人娱乐:JDK源码分析,并发包同步工具Semaphore。,有个别AQS方法未有在从前深入分析过的此处提到到了会开始展览剖判。

前言

CyclicBarrier它是何等?一个合伙援助类,它同意1组线程相互等待,直到达到某些公共屏障点。类似于爱人里面联系要在深夜聚个会,多少个朋友全体到齐后才起来饮酒吃菜。

它里面维护了三个计数值count,每当三个线程就绪时该计数值就减小一回,当count为0时表示全体线程就绪,此时唤起全部线程。

源码

先看看源码注释中有关基本算法的牵线

        for (;;) {
            if (slot is empty) { 
                // slot为空时,将item 设置到Node 中                   
                place item in a Node;
                if (can CAS slot from empty to node) {
                    // 当将node通过CAS交换到slot中时,挂起线程等待被唤醒
                    wait for release;
                    // 被唤醒后返回node中匹配到的item
                    return matching item in node;
                }
            } else if (can CAS slot from node to empty) { // release
                // 将slot设置为空
                // 获取node中的item,将需要交换的数据设置到匹配的item
                get the item in node;
                set matching item in node;
                // 唤醒等待的线程
                release waiting thread;
            }
            // else retry on CAS failure
        }

比如有二条线程A和B,A线程交流数据时,开掘slot为空,则将急需交流的数量放在slot中等待其余线程进来调换数据,等线程B进来,读取A设置的数码,然后设置线程B需求沟通的数码,然后提示A线程,原理正是这么轻松。当时当五个线程之间进行沟通数据时就晤面世难点,所以Exchanger参加了slot数组。

源码

小编们先看它的性质和构造器,

    // Sync为其内部类
    private final Sync sync;

    // 唯一的一个构造器
    // 构造参数count就是需要等待事件的数量
    public CountDownLatch(int count) {
        // 为了保证count >= 0
        if (count < 0) throw new IllegalArgumentException("count < 0");
        // 构造sync
        this.sync = new Sync(count);
    }

今昔来看里面类Sync,它继续了AQS,完结了共享锁方法,上面来看其源码,代码行数十分的少很好通晓

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            // setState 为AQS更改其state变量的方法
            // 将AQS state变量设置成count
            setState(count);
        }

        int getCount() {
            // AQS的获取state锁状态值
            return getState();
        }
        // 尝试获取共享锁
        protected int tryAcquireShared(int acquires) {
            // 返回1表示此时锁状态值为0表示锁已释放
            // -1表示此时锁状态值大于0,表示出于锁定状态
            return (getState() == 0) ? 1 : -1;
        }
        // 尝试释放共享锁(计数器递减releases次)
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            // 等待锁状态值为0或者更改锁状态值成功
            for (;;) {
                // 将state赋值给变量c
                int c = getState();
                if (c == 0)
                    // 此时锁已清除
                    return false;
                // 递减
                int nextc = c-1;
                // 比较state的状态值是否等于C,等于将state状态值改为nextc
                if (compareAndSetState(c, nextc))
                    // 更改成功后,如果nextc为0则返回true
                    return nextc == 0;
            }
        }
    }

源码

源码

Exchanger 属性及构造器

    // 用于左移Node数组下标,从而得出数据在内存中的偏移量来获取数据,避免伪共享
    private static final int ASHIFT = 7;
    // note数组最大下标
    private static final int MMASK = 0xff;
    // 用于递增bound,每次加一个SEQ
    private static final int SEQ = MMASK   1;
    // CPU核心数
    private static final int NCPU = Runtime.getRuntime().availableProcessors();
    // 当前数组最大的下标(多处理器情况下)
    static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
    // 自旋次数,CPU核心为1个时,自旋被禁用
    private static final int SPINS = 1 << 10;
    // 空对象,用于当线程exchange方法中参数为null时传递给其他线程的对象
    private static final Object NULL_ITEM = new Object();
    // 用于超时时传递的对象
    private static final Object TIMED_OUT = new Object();
    // Participant 继承了ThreadLocal,也就是说该对象用于获取每条线程中存放的值
    private final Participant participant;
    // 多个线程交换
    private volatile Node[] arena;
    // 用于2个线程交换
    private volatile Node slot;
    // 该值主要用于与
    private volatile int bound; 
    // 通过unsafe用于CAS操作
    private static final sun.misc.Unsafe U;
    private static final long BOUND;
    private static final long SLOT;
    private static final long MATCH;
    private static final long BLOCKER;
    private static final int ABASE;
    static {
        int s;
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> ek = Exchanger.class;
            Class<?> nk = Node.class;
            Class<?> ak = Node[].class;
            Class<?> tk = Thread.class;
            // bound属性在Exchanger对象中的偏移地址
            BOUND = U.objectFieldOffset
                (ek.getDeclaredField("bound"));
            // slot属性在Exchanger对象中的偏移地址   
            SLOT = U.objectFieldOffset
                (ek.getDeclaredField("slot"));
            // slot属性在Node对象中的偏移地址
            MATCH = U.objectFieldOffset
                (nk.getDeclaredField("match"));
           // parkBlocker属性在Thread对象中的偏移地址
            BLOCKER = U.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            // 获取Node[]数组中每个元素的大小,这里是4
            s = U.arrayIndexScale(ak);
            // ABASE absorbs padding in front of element 0
            // 获取Node[]数组中第一个元素的偏移地址   128
            ABASE = U.arrayBaseOffset(ak)   (1 << ASHIFT);

        } catch (Exception e) {
            throw new Error(e);
        }
        if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
            // 这里是为了保证 Node数组中的元素不会争用一个缓存行
            throw new Error("Unsupported array scale");
    }

await方法

await方法就是当state状态值不为0时将近期线程阻塞,然后等待升迁

    public void await() throws InterruptedException {
        //调用的AQS获取共享锁可中断方法
        sync.acquireSharedInterruptibly(1);
    }

 

咱俩来看看AQS的acquireSharedInterruptibly方法

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 此方法调用的是CountDownLatch内部类Sync的方法
        // 如果锁状态不为0,则执行doAcquireSharedInterruptibly方法
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

doAcquireSharedInterruptibly方法也是由AQS完毕的

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 添加一个共享锁节点到队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            // 直到线程被唤醒或者线程被中断时跳出循环
            for (;;) {
                // node节点的前驱节点
                final Node p = node.predecessor();
                if (p == head) {
                    // 调用CountDownLatch内部类Sync的方法
                    // 如果锁状态值为0,则返回值大于0
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 当锁状态值为0,开始将note节点设置为头节点并唤醒后继节点
                        // 也就是队列不断的出列,然后唤醒后继节点,后继节点被唤醒后由于前驱节点被设置成头节点,又会调用该方法进行后继节点的唤醒
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }

                /*
                 shouldParkAfterFailedAcquire用于清除已中断/或者取消的线程以及判断此次循环是否需要挂起线程
                 parkAndCheckInterrupt 挂机当前线程
                 shouldParkAfterFailedAcquire 和 parkAndCheckInterrupt 在AQS之前博文里分析过这里就不再分析了
                 */
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                // 表示当前线程中断,取消获取锁
                // 之前分析过,略过源码分析
                cancelAcquire(node);
        }
    }

 

setHeadAndPropagate方法,主要成效是唤醒后继节点线程

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
        // 当前节点设置为头节点,节点关联的线程设置为空
        setHead(node)
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                // 节点等待状态为signal时,唤醒后继节点线程
                doReleaseShared();
        }
    }

doReleaseShared很抢眼,当当前节点等待状态为signal时,唤醒后继节点线程

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 当前线程等待状态为signal时表示后继节点需要唤醒
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        // 表示h节点的状态替换失败,会再次循环判断h节点的状态
                        continue;            // loop to recheck cases
                    // 唤醒后继节点
                    unparkSuccessor(h);
                }
                // 状态为0时,将其改成PROPAGATE,更改失败会再次循环判断h节点的状态
          // 这种情况发生在一个线程调用await方法,节点的等待状态还是初始值0未来得及被修改,刚好state被置为0然后调用了doReleaseShared方法

                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

构造器和质量

    // 继承了AQS,并且有2个子类实现(非公平锁和公平锁)
    private final Sync sync;

    // 构造器只有许可数量permits的信号量
    public Semaphore(int permits) {
        // 非公平锁
        sync = new NonfairSync(permits);
    }
    // 构造器带有许可数量permitsy以及是否是公平锁
    public Semaphore(int permits, boolean fair) {
        // fair为true时,为公平锁否则非公平
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }    

CyclicBarrier属性和构造器

public class CyclicBarrier {
    // 互斥锁
    private final ReentrantLock lock = new ReentrantLock();
    // 条件等待
    private final Condition trip = lock.newCondition();
    // 参与者数目
    private final int parties;
    // 栅栏放开时的执行任务
    private final Runnable barrierCommand;
    // 用于表示栅栏是否放开或者重置
    private Generation generation = new Generation();
    // 在等待的参数者数目
    private int count;

    // 构造parties个参与者的栅栏
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    // 构造parties个参与者的栅栏,并且设置栅栏放开时要执行的任务
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }   
}    

本文由澳门新葡亰发布于计算机知识,转载请注明出处:澳门葡京真人娱乐:JDK源码分析,并发包同步工

关键词:

最火资讯