一、原子累加器对比
我们通过如下代码,比较synchronized
、AtomicInteger
、AtomicLong
、LongAdder
、LongAccumulator
五种计数性能。
示例
需求:热点商品点赞计算器,点赞数进行统计,不要求实时精确。50个线程,每个线程100W次,统计总点赞数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
| package site.weiyikai.demo06;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAdder;
public class ClickNumber { int number = 0; public synchronized void add_synchronized(){ number++; }
AtomicInteger atomicInteger = new AtomicInteger(); public void add_AtomicInteger(){ atomicInteger.incrementAndGet(); }
AtomicLong atomicLong = new AtomicLong(); public void add_AtomicLong(){ atomicLong.incrementAndGet(); }
LongAdder longAdder = new LongAdder(); public void add_LongAdder(){ longAdder.increment(); }
LongAccumulator longAccumulator = new LongAccumulator((x, y)->x+y,0); public void add_longAccumulator(){ longAccumulator.accumulate(1); } }
public class LongAdderCalcDemo { public static final int SIZE_THREAD = 50; public static final int _1w = 10000;
public static void main(String[] args) throws InterruptedException { ClickNumber clickNumber = new ClickNumber(); long startTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis(); CountDownLatch latch_synchronized = new CountDownLatch(SIZE_THREAD); CountDownLatch latch_AtomicInteger= new CountDownLatch(SIZE_THREAD); CountDownLatch latch_AtomicLong = new CountDownLatch(SIZE_THREAD); CountDownLatch latch_LongAdder = new CountDownLatch(SIZE_THREAD); CountDownLatch latch_LongAccumulator = new CountDownLatch(SIZE_THREAD);
startTime = System.currentTimeMillis(); for (int i = 1; i <= SIZE_THREAD; i++) { new Thread(()->{ try { for (int j = 1; j <= 100*_1w; j++) { clickNumber.add_synchronized(); } } catch (Exception e) { e.printStackTrace(); } finally { latch_synchronized.countDown(); } },String.valueOf(i)).start(); } latch_synchronized.await(); endTime = System.currentTimeMillis(); System.out.println("synchronized花费时间:"+ (endTime-startTime)+" 数值为:"+clickNumber.number);
startTime = System.currentTimeMillis(); for (int i = 1; i <= SIZE_THREAD; i++) { new Thread(()->{ try { for (int j = 1; j <= 100*_1w; j++) { clickNumber.add_AtomicInteger(); } } catch (Exception e) { e.printStackTrace(); } finally { latch_AtomicInteger.countDown(); } },String.valueOf(i)).start(); } latch_AtomicInteger.await(); endTime = System.currentTimeMillis(); System.out.println("AtomicInteger花费时间:"+ (endTime-startTime)+" 数值为:"+clickNumber.atomicInteger.get());
startTime = System.currentTimeMillis(); for (int i = 1; i <= SIZE_THREAD; i++) { new Thread(()->{ try { for (int j = 1; j <= 100*_1w; j++) { clickNumber.add_AtomicLong(); } } catch (Exception e) { e.printStackTrace(); } finally { latch_AtomicLong.countDown(); } },String.valueOf(i)).start(); } latch_AtomicLong.await(); endTime = System.currentTimeMillis(); System.out.println("AtomicLong花费时间:"+ (endTime-startTime)+" 数值为:"+clickNumber.atomicLong.get());
startTime = System.currentTimeMillis(); for (int i = 1; i <= SIZE_THREAD; i++) { new Thread(()->{ try { for (int j = 1; j <= 100*_1w; j++) { clickNumber.add_LongAdder(); } } catch (Exception e) { e.printStackTrace(); } finally { latch_LongAdder.countDown(); } },String.valueOf(i)).start(); } latch_LongAdder.await(); endTime = System.currentTimeMillis(); System.out.println("LongAdder花费时间:"+ (endTime-startTime)+" 数值为:"+clickNumber.longAdder.longValue());
startTime = System.currentTimeMillis(); for (int i = 1; i <= SIZE_THREAD; i++) { new Thread(()->{ try { for (int j = 1; j <= 100*_1w; j++) { clickNumber.add_longAccumulator(); } } catch (Exception e) { e.printStackTrace(); } finally { latch_LongAccumulator.countDown(); }
},String.valueOf(i)).start(); } latch_LongAccumulator.await(); endTime = System.currentTimeMillis(); System.out.println("LongAccumulator花费时间:"+ (endTime-startTime)+" 数值为:"+clickNumber.longAccumulator.longValue());
} }
|
运行结果
1 2 3 4 5
| synchronized花费时间:1314 数值为:50000000 AtomicInteger花费时间:609 数值为:50000000 AtomicLong花费时间:639 数值为:50000000 LongAdder花费时间:94 数值为:50000000 LongAccumulator花费时间:45 数值为:50000000
|
通过上面的代码运行我们可以清晰地看到,每种方式都有效的产完成了累加的效果,但是明显使用LongAdder的效率要更好,甚至要高出AtomicLong好几倍。
现在,我们可以简单地理解为,原子累加器就是JAVA在并发编程下提供的有保障的累加手段。
那么LongAdder
是如何做的这一点的呢?接下来我们慢慢进行分析。
二、LongAdder概述
JDK1.8时,java.util.concurrent.atomic
包中提供了一个新的原子类:LongAdder
。
根据Oracle官方文档的介绍,LongAdder
在高并发的场景下会比它的前辈AtomicLong
具有更好的性能,代价是消耗更多的内存空间。
在并发量较低的环境下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发环境下,N个线程同时进行自旋操作,会出现大量失败并不断自旋的情况,此时AtomicLong
的自旋会成为瓶颈。
这就是LongAdder
引入的初衷,解决高并发环境下AtomicLong
的自旋瓶颈问题。
在大数据处理过程,为了方便监控,需要统计数据,少不了原子计数器。为了尽量优化性能,需要采用高效的原子计数器。
在jdk8中,引入了LongAdder
,非常适合多线程原子计数器。与AtomicLong
做了一个测试,LongAdder
在多线程环境中,原子自增长性能要好很多。它常用于状态采集、统计等场景。
三、LongAdder原理
3.1 AtomicLong和LongAdder比较
AtomicLong
中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。
LongAdder
的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
AtomicLong
是多个线程针对单个热点值value进行原子操作。而LongAdder
是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作。
3.2 原理
LongAdder
在无竞争条件下,跟 AtomicInteger
一样,对同一个 base 进行操作。
当出现了多线程竞争时采用化整为零的做法,用空间换时间,用一个数组cells[]
,将一个 value 拆分进这个 cells[]
数组,多线程同时对各个 cell 中的 value 进行操作,可以对线程 id 进行 hash,再根据 hash 映射到这个数组的某个索引,然后对 cell 中的 value 进行 cas 操作,当所有线程都执行完毕,累加所有 cell 中的value 值 和 无竞争下的 base 即为最终结果。
总结一句公式: LongAdder = base基础数据 + cell[] + CAS 操作,用空间换时间所以快。
四、基类Striped64内部三个重要成员
LongAdder
继承于 Stripped64
类,base
值和 cell
数组都在 Stripped64
类中定义,它的内部三个重要的成员如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
transient volatile Cell[] cells;
transient volatile long base;
transient volatile int cellsBusy;
|
Stripped64
内部包含一个 base
和一个 Cell[]
类型的 cells
数组,cells
数组又叫哈希表,在没有竞争的情况下,要累加的数通过CAS累加到 base
上,如果有竞争的话,会将要累加的数累加到 cells
数组中的某个 Cell
元素里面。
Stripped64
的整体值 value
的获取函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public long longValue() { return sum(); }
public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
|
Stripped64
的设计核心思路是通过内部的分散计算来避免竞争,以空间换取时间。没有竞争时 cells
数组为 null,这时只使用 base
,一旦发生竞争,cells
数组就上场了。
cells
数组第一次初始化长度为2,以后每次扩容都变为原来的两倍,一直到 cells
数组的长度大于等于当前服务器的CPU核数,同一时刻能持有CPU时间片去并发操作同一内存地址的最大线程数最多也就是CPU的核数。
在存在线程争用的时候,每个线程被映射到 cells[threadLocalRandomProbe&cells.length]
位置的 Cell 元素,该线程对 value
所做的累加操作就执行在对应的 Cell
元素的值上。