长沙网站营销承德市住房和城乡建设局官网

当前位置: 首页 > news >正文

长沙网站营销,承德市住房和城乡建设局官网,泰安人才网,阿里云域名注册服务网站文章目录 AtomicLong实现原理递增和递减操作代码总结 LongAdder实现原理实现原理LongAdder 代码分析构造方法sum方法reset方法sumThenReset方法longValue方法add 方法longAccumulate 方法 总结 JUC 包提供 了一系列的原子性操作类#xff0c;这些类都是使用非阻塞算法 CAS 实现… 文章目录 AtomicLong实现原理递增和递减操作代码总结 LongAdder实现原理实现原理LongAdder 代码分析构造方法sum方法reset方法sumThenReset方法longValue方法add 方法longAccumulate 方法 总结 JUC 包提供 了一系列的原子性操作类这些类都是使用非阻塞算法 CAS 实现的 相比使用锁实现原子性操作这在性能上有很大提高。本篇主要以 AtomicLong 为例进行原子操作类的讲解找到原子操作类的不足并提出解决方案。 AtomicLong实现原理 AtomicLong 是原子性递增或者递减类其内部使用 Unsafe 来实现由于AtomicLong 也是在rt包下都是由BootStarp 类加载器加载所以获取Unsafe 时直接使用 Unsafe.getUnsafe ( 方法就可以获取到。 递增和递减操作代码 // ( 6 调用 unsafe方法 原子性设置value值为原始值1 返回值为递增后的值 public final long incrementAndGet() { return unsafe.getAddAddLong(this, valueOffset , lL) 1L ; } // ( 7 调用 unsafe方法原子性设置 value直为原始值 1 返回值为递减之后的值 public final long decrementAndGet() { return unsafe . getAddAddLong(this, valueOffset , - lL) - 1L ; } // (8 调用 unsafe方法原子性设置value值为原始值1 返回值为原始值 public final long getAndincrement() { return unsafe .getAndAddLong(this , valueOffset , 1L) ; } // ( 9 调用 unsafe方法原子性设置 value值为原始值 1 返回位为原始值 public final long getAndDecrement( ) { return unsafe.getAndAddLong (this , valueOffset - 1L) ; } 在如上代码内部都是通过调用 Unsafe 的 getAndAddLong 方法来实现操作这个函数 是个原子性操作这里第一个参数是 AtomicLong 实例的引用 第二个参数是 value 变量在 AtomicLong 中的偏移值第三个参数是要设置的第二个变量的值。 其中 getAndlncrement 方法在 JDK 7 中的实现逻辑为 public final long getAndincrement () { while (true) { long current get() ; long next current 1 ; if (compareAndSet(current , next)) return current ; } } 在如上代码中每个线程是先拿到变量的当前值由于 value 是 volatile 变量所以这 里拿到的是最新的值然后在工作内存中对其进行增 1 操作 而后使用 CAS 修改变量的值。如果设置失败则循环继续尝试 直到设置成功 。 而 JDK8 中的逻辑为 public final long getAndAddLong(Object paramObject , long paramLongl , long paramLong2) { long l ; do { l getLongvolatile(paramObject , paramLongl) ; ) while (!compareAndSwapLong(param0bject , paramLong1 , 1, 1 paramLong2) ); return l ; } 可以看到 JDK 7 的 AtomicLong 中的循环逻辑已经被 JDK 8 中的原子操作类 UNsafe 内置了 之所以内置应该是考虑到这个函数在其他地方也会用到而内 置可以提高复用性 。 public final boolean compareAndSet(long expect , long update) { return unsafe.compareAddSwapLong ( this , valueOffset , expect , update) ; } 由如上代码可知在内部还是调用了 unsafe.compareAndSwapLong 方法 。 如果原子变量中的 value 值等于expect则使用 update 值更新该值并返回 true否则返回 false 。 总结 经过上面代码的介绍我们发现cas的实现是依赖于 unsafe 实现的在进行递增递减时会采用循环的方式去cas更新原始值。这样的好处是不需要使用锁进行阻塞来保障数据安全直接利用硬件自带的比较替换方式更新数据相对来说更加轻量级。但是如果在高并发下这种比较替换会十分频繁导致CPU不断被占用进而导致监控中出现CPU使用告警。为解决这个问题在JDK8中出现了LongAdder。 LongAdder实现原理 既然 AtomicLong 的性能瓶颈是由于过 多 线程同时去竞争一个变量的更新而产生的那么如果把一个变量分解为多个变量让同样多的线程去竞争多个资源 是不是就解决了性能问题是的 LongAdder 就是这个思路 。 实现原理 使用 LongAdder 时则是在内部维护多个 Ce ll 变量每个 Cell 里面有一个初始值为 0 的 long 型变量这样在同等并发量的情况下争夺单个变量更新操作的线程量会减少这变相地减少了 争夺共享资源的并发量。另 外多个线程在争夺同一个 Cell 原子变量时如果失败了 它并不是在当前 Cell 变量上一直自旋 CAS 重试而是尝试在其他 Cell 的变量上进行 CAS 尝试 这个改变增加了当前线程重试 CAS 成功的可能性 。最后在获取 LongAdder 当前值时 是把所有 Cell 变量的 value 值累加后再加上 base返回的 。 LongAdder 维护了 一个延迟初始化的原子性更新数组默认情况 下 Cell 数组是 null 和 一个基值变量 base 。 由于 Cells 占用的内存是相对比较大的所以一开始并不创建它而是在需要时创建也就是惰性加载。 当一开始判断 Cell 数组是 null 并且并发线程较少时所有 的 累加操作都是对 base 变量进行的 。 保持 Ce ll 数组的大小为 2 的 N 次方在初始化时 Cell 数组中的 Cell 元素个数为 2数组里面的变量实体是 Cell 类型。 Cell 类型是 AtomicLong 的一个改进用来减少缓存的争用也就是解决伪共享问题 。 对于大多数孤立的多个原子操作进行字节填充是浪费的因为原子性操作都是无规律地分散在内存中的 也就是说多个原子性变量的内存地址是不连续 的 多个原子变量被放入同 一个缓存行的可能性很小 。 但是原子性数组元素的内存地址是连续的所以数组内的 多个元素能经常共享缓存行因此这里使用 sun.misc.Contended 注解对Cell 类进行字节填充这防止了 数组中多个元素共享一个缓存行在性能上是一个提升。 LongAdder 代码分析 下面围绕以下话题从源码角度来分析 LongAdder 的实现 LongAdder 类继承自 Striped64 类在 Striped64 内部维护着三个变量。LongAdder 的真实值其实是 base 的值与 Cell 数组里面所有 Cell 元素中的 value 值的累加base 是个基础值默认为 0 。 cellsBusy 用来实现自旋锁状态值只有 0 和 l 当创建 Cell 元素扩容 Cell 数组或者初始化 Cell 数组时使用 CAS 操作该变量来保证同时只有一个线程可以进行其中之一的操作 。 构造方法 sun .misc.Contended static final class Cell { volatile long value ; Cell (long x) { value x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this , valueOffset , cmp , val) ; } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE ; private static final long valueOffset ; static { try { UNSAFE sun. misc .Unsafe . getUnsafe() ; Class? ak Cell . class ; valueOffset UNSAFE . objectFieldOffset (ak . getDeclaredField (” value” )) ; } catch (Exception e) { throw new Error(e) ; } } }
可以看到 Cell 的构造很简单其内部维护一个被声明为 volatile 的变量 这里声 明 为 volatile 是因为线程操作 value 变量时没有使用锁 为 了 保证变量的内存可见性这里将其声 明为 volatile 的 。另外 cas 函数通过 CAS 操作保证了当前线程更新时被分配的 Cell 元素 中 value 值的原子性。另外 Cell 类使用sun.misc .Contended 修饰是为了避免伪共享。 sum方法 long sum返回 当前的值 内 部操作是累加所有 Cell 内 部的 value 值后再累加 base 。 例如下面的代码 由于计算总和时没有对 Cell 数组进行加锁所以在累加过程中 可能有其他线程对 Cell 中 的值进行了修改 也有可能对数组进行了扩容所 以 sum 返回的值并不是非常精确的 其返回值并不是一个调用 sum 方法时的原子快照值 。 public long sum() { Cell[] as cells; Cell a;long sum base ;if (as ! null) {for int i 0 ; i as.length ; i) {if ( (a as[i] ) ! null)sum a . value;} } return sum; }
reset方法 void reset为重置操作 如下代码把 base 置为 0 如果 Cell 数组有元素 则元素值被重置为 0 。 public void reset () { Cell[] as cells ; Cell a; base 0L ; if (as 1 null ) { for int i 0 i as . length ; i) { if ((aas[i]) ! null) { a.value 0L ; } } }
sumThenReset方法 long sumThenReset 是 sum 的改造版本在使用 sum 累加对应的 Cell 值后 把当前 Cell 的值重置为 0, base 重置为 0。这样 当多线程调用该方法时会有问题 比如考虑第一个调用 线程清空 Cell 的值则后一个线程调用时累加的都是 0 值 。 longValue方法 long longValue 等价于 sum 。 add 方法 public void add(long x) { Cell[] as; long b, v; int m; Cell a;if ((as cells) ! null || !casBase(b base, b x)) {boolean uncontended true;if (as null || (m as.length - 1) 0 ||(a as[getProbe() m]) null ||!(uncontended a.cas(v a.value, v x)))longAccumulate(x, null, uncontended);} }final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); } 代码 (1)首先看 cells 是否为 null 如果为 null 则当前在基础变量 base 上进行累加 这时候就类似 AtomicLong 的操作 。 如果 cells 不为 null 或者线程执行代码 1 的 CAS 操作失败了 则会去执行代码 。 。代码 2 3 决定当前线程应该访 问 cells 数组里面的哪一个 Cell 元素如果当前线程映射的元素存在则执行代码4使用 CAS 操作去更新分配的 Ce ll 元素 的 value 值如果当前线程映射的元素不存在或者存在但是 CAS 操作失败则执行代码 5 。其实将代码2(3) (4 合起来看就是获取当前线程应该访问的 cells 数组的 Cell 元素然后进行 CAS 更新操作只是在获取期间如果有些条件不满足则会跳转到代码 5 执行。另外当前线程应该访 问 cells 数组的哪一个 Cell 元素是通过 getProbe() m 进行计算的 其中 m 是当前cells 数组元素个数 1 , getProbe则用于获取 当前线程中变量 threadLocalRandomProbe 的值这个值一开始为 0在代码 5 里面会对其进行初始化。并且当前线程通过分配的Cell 元素的 cas 函数来保证对 Cell 元素 value 值更新的原子性。 longAccumulate 方法 这是 cells 数组被初始化和扩容的地方。 final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h;if ((h getProbe()) 0) {ThreadLocalRandom.current(); // force initializationh getProbe();wasUncontended true;}boolean collide false; // True if last slot nonemptyfor (;;) {Cell[] as; Cell a; int n; long v;if ((as cells) ! null (n as.length) 0) {if ((a as[(n - 1) h]) null) {if (cellsBusy 0) { // Try to attach new CellCell r new Cell(x); // Optimistically createif (cellsBusy 0 casCellsBusy()) {boolean created false;try { // Recheck under lockCell[] rs; int m, j;if ((rs cells) ! null (m rs.length) 0 rs[j (m - 1) h] null) {rs[j] r;created true;}} finally {cellsBusy 0;}if (created)break;continue; // Slot is now non-empty}}collide false;}else if (!wasUncontended) // CAS already known to failwasUncontended true; // Continue after rehashelse if (a.cas(v a.value, ((fn null) ? v x :fn.applyAsLong(v, x))))break;else if (n NCPU || cells ! as)collide false; // At max size or staleelse if (!collide)collide true;else if (cellsBusy 0 casCellsBusy()) {try {if (cells as) { // Expand table unless staleCell[] rs new Cell[n 1];for (int i 0; i n; i)rs[i] as[i];cells rs;}} finally {cellsBusy 0;}collide false;continue; // Retry with expanded table}h advanceProbe(h);}else if (cellsBusy 0 cells as casCellsBusy()) {boolean init false;try { // Initialize tableif (cells as) {Cell[] rs new Cell[2];rs[h 1] new Cell(x);cells rs;init true;}} finally {cellsBusy 0;}if (init)break;}else if (casBase(v base, ((fn null) ? v x :fn.applyAsLong(v, x))))break; // Fall back on using base} }当每 个线程第 一次 执行到代码 6 时会初始化当前线程变 量threadLocalRandomProbe 的值上面也说了这个变量在计算当前线程应该被分配到 cells数组的哪一个 Cell 元素时会用到 。 cells 数组的初始化是在代码(14)的中进行的 其中 cellsBusy 是一 个标示 为 0 说明当前 cells 数组没有在被初始化或者扩容 也没有在新建 Cell 元素为 1则说明 cells 数组在被初始化或者扩容或者当前在创建新的 Cell 元素、通过 CAS 操作来进行 0 或 1状态的切换这里使用 casCellsBusy 函数。假设当 前线程通过 CAS 设置 cellsBusy 为 1则当前线程开始初始化操作那么这时候其他线程就不能进行扩容了 。 如代码 14.1 初始化cells 数组元 素个数为 2 然后使用 h1 计 算当前线程应该访问 celll 数组的哪个位置也就是使用当前线程的 threadLocalRandomProbe 变量值 ( cells 数组元素个数 1 然后标示 cells 数组已经被初始化最后代码 14.3 重置了 cellsBusy 标记 。 显然这里没有使用CAS 操作却是线程安全的原因是cellsBusy 是 volatile 类型的这保证了变量的内存可见性另外此时其他地方的代码没有机会修改 cellsBusy 的值 。 在这里初始化的 cells 数组里面的两个元素的值目前还是 null 。 cells 数组的扩容是在代码 (12 中进行的对 cells 扩容是有条件的也就是代码 10) ( 11 )的条件都不满足的时候 。具体就是当前 cells 的元素个数小于当前机器 CPU 个数并且当前多个线程访 问了 cells 中同 一个元素从而导致冲突使其中 一个线程 CAS 失败时才会进行扩容操作。这里为何要涉及 CPU 个数呢其实在基础篇中己经讲过 只有当每个 CPU 都运行一个线程时才会使多线程的效果最佳也就是当 cells 数组元素个数与 CPU 个数一致时每个 Cell 都使用 一个 CPU 进行处理这时性能才是最佳的 。 代码 (12 中的扩容操作也是先通过 CAS 设置 cellsBusy 为 1 然后才能进行扩容 。 假设 CAS 成功则执行代码(l2.1将容量扩充为之前的 2 倍并复制 Cell 元素到扩容后数组 。 另外扩容后 cells 数组里面除了包含复制过来的元素外还包含其他新元素这些元素的值目前还是 null 。 在代码 (7) (8中当前线程调用 add 方法并根据当前线程的随机数threadLoca!RandomProbe 和 cells 元 素个数计算要访问的 Cell 元素下标然后如果发现对应下标元素的值为 null则新增一个 Cell 元素到 cells 数组并且在将其添加到 cells 数组之前要竞争设置 cellsBusy 为 1 。 代码 13 对 CAS 失败的线程重新计算当前线程的随机值 threadLocalRandomProbe, 以减少下次访问 cells 元素时的冲突机会。 总结 本节介绍了 JDK 8 中新增的 LongAdder 原子性操作类该类通过内部 cells 数组分担了高并发下多线程同时对一个原子变量进行更新时的竞争量让多个线程可 以 同时对 cells数组里面的元素进行并行的更新操作 。 另外数组元素 Cell 使用sun .misc .Contended 注解进行修饰 这避免了 cells 数组 内 多个原子变量被放入 同 一个缓存行 也就是避免了 伪共享这对性能也是一个提升 。