原子累加器之LongAdder

一、原子累加器对比

我们通过如下代码,比较synchronizedAtomicIntegerAtomicLongLongAdderLongAccumulator五种计数性能。

示例

需求:热点商品点赞计算器,点赞数进行统计,不要求实时精确。50个线程,每个线程100W次,统计总点赞数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package site.weiyikai.demo06;


import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;

/**
* @author xiaowei
* @create 2023-03-29 19:28
*/
//需求:热点商品点赞计算器,点赞数进行统计,不要求实时精确。50个线程,每个线程100W次,统计总点赞数。
public class ClickNumber {
int number = 0;
public synchronized void add_synchronized(){
number++;
}

AtomicInteger atomicInteger = new AtomicInteger();
public void add_AtomicInteger(){
atomicInteger.incrementAndGet();
}

AtomicLong atomicLong = new AtomicLong();
public void add_AtomicLong(){
atomicLong.incrementAndGet();
}

LongAdder longAdder = new LongAdder();
public void add_LongAdder(){
longAdder.increment();
}

LongAccumulator longAccumulator = new LongAccumulator((x, y)->x+y,0);
public void add_longAccumulator(){
longAccumulator.accumulate(1);
}
}

public class LongAdderCalcDemo {
public static final int SIZE_THREAD = 50;
public static final int _1w = 10000;

public static void main(String[] args) throws InterruptedException {
ClickNumber clickNumber = new ClickNumber();
long startTime = System.currentTimeMillis();
long endTime = System.currentTimeMillis();
CountDownLatch latch_synchronized = new CountDownLatch(SIZE_THREAD);
CountDownLatch latch_AtomicInteger= new CountDownLatch(SIZE_THREAD);
CountDownLatch latch_AtomicLong = new CountDownLatch(SIZE_THREAD);
CountDownLatch latch_LongAdder = new CountDownLatch(SIZE_THREAD);
CountDownLatch latch_LongAccumulator = new CountDownLatch(SIZE_THREAD);

startTime = System.currentTimeMillis();
for (int i = 1; i <= SIZE_THREAD; i++) {
new Thread(()->{
try {
for (int j = 1; j <= 100*_1w; j++) {
clickNumber.add_synchronized();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
latch_synchronized.countDown();
}
},String.valueOf(i)).start();
}
latch_synchronized.await();
endTime = System.currentTimeMillis();
System.out.println("synchronized花费时间:"+ (endTime-startTime)+" 数值为:"+clickNumber.number);


startTime = System.currentTimeMillis();
for (int i = 1; i <= SIZE_THREAD; i++) {
new Thread(()->{
try {
for (int j = 1; j <= 100*_1w; j++) {
clickNumber.add_AtomicInteger();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
latch_AtomicInteger.countDown();
}
},String.valueOf(i)).start();
}
latch_AtomicInteger.await();
endTime = System.currentTimeMillis();
System.out.println("AtomicInteger花费时间:"+ (endTime-startTime)+" 数值为:"+clickNumber.atomicInteger.get());

startTime = System.currentTimeMillis();
for (int i = 1; i <= SIZE_THREAD; i++) {
new Thread(()->{
try {
for (int j = 1; j <= 100*_1w; j++) {
clickNumber.add_AtomicLong();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
latch_AtomicLong.countDown();
}
},String.valueOf(i)).start();
}
latch_AtomicLong.await();
endTime = System.currentTimeMillis();
System.out.println("AtomicLong花费时间:"+ (endTime-startTime)+" 数值为:"+clickNumber.atomicLong.get());

startTime = System.currentTimeMillis();
for (int i = 1; i <= SIZE_THREAD; i++) {
new Thread(()->{
try {
for (int j = 1; j <= 100*_1w; j++) {
clickNumber.add_LongAdder();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
latch_LongAdder.countDown();
}
},String.valueOf(i)).start();
}
latch_LongAdder.await();
endTime = System.currentTimeMillis();
System.out.println("LongAdder花费时间:"+ (endTime-startTime)+" 数值为:"+clickNumber.longAdder.longValue());

startTime = System.currentTimeMillis();
for (int i = 1; i <= SIZE_THREAD; i++) {
new Thread(()->{
try {
for (int j = 1; j <= 100*_1w; j++) {
clickNumber.add_longAccumulator();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
latch_LongAccumulator.countDown();
}

},String.valueOf(i)).start();
}
latch_LongAccumulator.await();
endTime = System.currentTimeMillis();
System.out.println("LongAccumulator花费时间:"+ (endTime-startTime)+" 数值为:"+clickNumber.longAccumulator.longValue());

}
}

运行结果

1
2
3
4
5
synchronized花费时间:1314 数值为:50000000
AtomicInteger花费时间:609 数值为:50000000
AtomicLong花费时间:639 数值为:50000000
LongAdder花费时间:94 数值为:50000000
LongAccumulator花费时间:45 数值为:50000000

通过上面的代码运行我们可以清晰地看到,每种方式都有效的产完成了累加的效果,但是明显使用LongAdder的效率要更好,甚至要高出AtomicLong好几倍。

现在,我们可以简单地理解为,原子累加器就是JAVA在并发编程下提供的有保障的累加手段。

那么LongAdder是如何做的这一点的呢?接下来我们慢慢进行分析。

二、LongAdder概述

JDK1.8时,java.util.concurrent.atomic包中提供了一个新的原子类:LongAdder

根据Oracle官方文档的介绍,LongAdder在高并发的场景下会比它的前辈AtomicLong 具有更好的性能,代价是消耗更多的内存空间。

  • AtomicLong存在的问题

在并发量较低的环境下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发环境下,N个线程同时进行自旋操作,会出现大量失败并不断自旋的情况,此时AtomicLong的自旋会成为瓶颈。

这就是LongAdder引入的初衷,解决高并发环境下AtomicLong的自旋瓶颈问题。

  • 使用场景

在大数据处理过程,为了方便监控,需要统计数据,少不了原子计数器。为了尽量优化性能,需要采用高效的原子计数器。
​ 在jdk8中,引入了LongAdder,非常适合多线程原子计数器。与AtomicLong做了一个测试,LongAdder在多线程环境中,原子自增长性能要好很多。它常用于状态采集、统计等场景。

三、LongAdder原理

3.1 AtomicLong和LongAdder比较

  • AtomicLong

image-20230329195927478

AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。

  • LongAdder

image-20230329195947760

LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。

AtomicLong是多个线程针对单个热点值value进行原子操作。而LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作。

3.2 原理

image-20230329200103587

LongAdder 在无竞争条件下,跟 AtomicInteger 一样,对同一个 base 进行操作。

当出现了多线程竞争时采用化整为零的做法,用空间换时间,用一个数组cells[],将一个 value 拆分进这个 cells[] 数组,多线程同时对各个 cell 中的 value 进行操作,可以对线程 id 进行 hash,再根据 hash 映射到这个数组的某个索引,然后对 cell 中的 value 进行 cas 操作,当所有线程都执行完毕,累加所有 cell 中的value 值 和 无竞争下的 base 即为最终结果。
总结一句公式: LongAdder = base基础数据 + cell[] + CAS 操作,用空间换时间所以快。

四、基类Striped64内部三个重要成员

LongAdder 继承于 Stripped64 类,base 值和 cell 数组都在 Stripped64 类中定义,它的内部三个重要的成员如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 成员一:存放Cell的哈希表,大小为2的幂
*/
transient volatile Cell[] cells;
/**
* 成员二:基础值
* 1.在没有竞争时会更新这个值
* 2.在cells初始化时,cells不可用,也会尝试通过CAS操作值累加到base
*/
transient volatile long base;
/**
* 自旋锁,通过CAS操作加锁,为0表示cells数组没有处于创建、扩容阶段
* 为1表示正在创建或者扩展cells数组,不能进行新Cell元素的设置操作
*/
transient volatile int cellsBusy;

Stripped64 内部包含一个 base 和一个 Cell[] 类型的 cells 数组,cells 数组又叫哈希表,在没有竞争的情况下,要累加的数通过CAS累加到 base 上,如果有竞争的话,会将要累加的数累加到 cells 数组中的某个 Cell 元素里面。

Stripped64 的整体值 value 的获取函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public long longValue() {
return sum();
}

/**
* 将多个cells数组中的值加起来的和
*/
public long sum() {
Cell[] as = cells;
Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

Stripped64 的设计核心思路是通过内部的分散计算来避免竞争,以空间换取时间。没有竞争时 cells 数组为 null,这时只使用 base,一旦发生竞争,cells 数组就上场了。

cells 数组第一次初始化长度为2,以后每次扩容都变为原来的两倍,一直到 cells 数组的长度大于等于当前服务器的CPU核数,同一时刻能持有CPU时间片去并发操作同一内存地址的最大线程数最多也就是CPU的核数。

在存在线程争用的时候,每个线程被映射到 cells[threadLocalRandomProbe&cells.length] 位置的 Cell 元素,该线程对 value 所做的累加操作就执行在对应的 Cell 元素的值上。


原子累加器之LongAdder
http://example.com/2023/03/29/原子累加器之LongAdder/
作者
程序员小魏
发布于
2023年3月29日
许可协议