首先介紹下 LongAddr
這個(gè)類的作用,這個(gè)類俐镐,是JDK8 以后提供的新的矫限,用于并發(fā)計(jì)數(shù)的工具。以前我們并發(fā)的計(jì)數(shù)的話京革,使用的都是 AtomicLong
但是這個(gè)原子類奇唤,性能是有不足的,因?yàn)楫?dāng)存在大量并發(fā)的時(shí)候匹摇,可能很多的 CAS
操作都是失敗的咬扇,所以線程會(huì)不斷的重試CAS
,但是每一批中能成功的 CAS
操作只會(huì)有一個(gè)廊勃,當(dāng)并發(fā)太高時(shí)候懈贺,會(huì)出現(xiàn)性能問題!
LongAddr
采用的方法是坡垫,共享熱點(diǎn)數(shù)據(jù)分離的計(jì)數(shù)
梭灿,將一個(gè)數(shù)字的值拆分為一個(gè)數(shù)組!然后這個(gè)數(shù)組冰悠,后面修改這個(gè)值的時(shí)候堡妒,就修改這個(gè)數(shù)組的一部分!如果要得到這個(gè)數(shù)字的話溉卓,就要把這個(gè)值加起來皮迟!這樣的技術(shù)搬泥,能讓并發(fā)量大大提高!
熱點(diǎn)數(shù)據(jù)分離
也是分段鎖思想的一種延伸伏尼!將原本的對(duì)一個(gè)值的樂觀鎖忿檩,分離成多個(gè)值的樂觀鎖。
LongAddr 的優(yōu)點(diǎn):有很高性能的并發(fā)寫的能力爆阶!
缺點(diǎn):讀取的性能不是很高效燥透!而且可能不是很準(zhǔn)確,因?yàn)槭且?shù)組所有的值相加
---源碼解析---
LongAddr
繼承自 Striped64
這個(gè)類
Striped64
這個(gè)類辨图,有一個(gè) Cell
的內(nèi)部類班套,這個(gè)類有個(gè) Contended
注解,這個(gè)注解故河,用于將數(shù)據(jù)放置在不同的緩沖行中孽尽,這樣可以避免偽共享問題,提供并發(fā)量(偽共享問題忧勿,后面單獨(dú)說明)偽共享
Striped64
還有 cells
這個(gè) Cell
數(shù)組,這個(gè)用于保存我們所說的瞻讽,分離的數(shù)據(jù)
base
字段,在并發(fā)量不大速勇,沒有導(dǎo)致CAS失敗的情況下揪惦,會(huì)直接修改這個(gè)值!
cellsBusy
陨晶,這個(gè)值為 1 代表我們正在修改cells的大惺簟!
LongAddr 最核心的方法就是 add
方法了:
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
// 如果cells數(shù)組不為空漓穿,或者嗤军,cas的修改base的值失敗了
// cells數(shù)組不為空,那么代表曾經(jīng)發(fā)生過并發(fā)修改失敗了
// casBase修改失敗晃危,代表本次并發(fā)修改失敗了
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
// 如果 cells數(shù)組是空的叙赚,直接進(jìn)入longAccumulate
// 如果 cells數(shù)組小于1 直接進(jìn)入
// 如果 cells 當(dāng)前 slot的值為null 直接進(jìn)入
// 如果 cas 當(dāng)前 slot的值失敗了直接進(jìn)入
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
add
方法總結(jié):
我們總是 如果
cells
不為NULL
,那么嘗試修改cells
對(duì)應(yīng)的slot
的值
如果cells
是空的僚饭,那么我們就嘗試修改base
,base
修改失敗了震叮,代表當(dāng)前有并發(fā),也會(huì)進(jìn)入longAccumulate
上面的add操作的核心方法是:longAccumulate
這個(gè)方法是Striped64提供的:
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
// getProbe 方法是當(dāng)前線程的隨機(jī)數(shù)鳍鸵!使用這個(gè)就能隨機(jī)的修改cells的值了
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
// 如果 cells 是空的苇瓣,代表第一次發(fā)生并發(fā)!
if ((as = cells) != null && (n = as.length) > 0) {
// 進(jìn)入此處的偿乖,都是cells不為空的
// 隨機(jī)選擇一個(gè) slot 击罪,如果是空的,則創(chuàng)建一個(gè)新的Cell添加進(jìn)去贪薪!
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
// 設(shè)置 Busy
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
// 如果創(chuàng)建成功媳禁,則可以直接退出
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// 這里是如果發(fā)生了鎖競爭,并且當(dāng)前的slot已經(jīng)被占用了
// 進(jìn)入這里 画切,代表選擇下一個(gè) 隨機(jī)值竣稽,然后再來一次
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 如果兩次隨機(jī)值 都被占用了,那么就嘗試設(shè)置下 新增 當(dāng)前Slot的值
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 如果還是失敗了霍弹,那么再判斷下毫别,cells 的個(gè)數(shù)能否添加了,
// cells 個(gè)數(shù)最大是 NCPU 的最近的2的倍數(shù)典格!
// 如果 cells 數(shù)目已經(jīng)大于等于這個(gè)值了岛宦,那么只會(huì) 一直的改變slot
// 如果slot都被占用的話,就一直嘗試去加在這個(gè)slot的值上
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
// 如果cells的值 不太夠钝计!小于 NCPU的話恋博!會(huì)進(jìn)入這里擴(kuò)容cells
// 擴(kuò)容的話,首先也是 設(shè)置 busy
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 直接復(fù)制數(shù)據(jù)私恬!
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// 上面的每次循環(huán)結(jié)束的話债沮,都會(huì)改變下Probe
// 這樣就實(shí)現(xiàn)了隨機(jī)的修改cells的某個(gè)元素!
h = advanceProbe(h);
}
// 這里是初始化 Cells的邏輯本鸣,先檢查 cellsBusy
// 如果有線程進(jìn)入了Busy的話疫衩,就執(zhí)行下一步,也就是casbase荣德,再不行就輪訓(xùn)操作闷煤!
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
//默認(rèn)的Cells大小為2
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
// 設(shè)置完畢童芹,就可以退出了
if (init)
break;
}
// 如果 隊(duì)列為空,并且有其他線程鲤拿,正在實(shí)例化假褪,cells的話
// 就去修改base的值!
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
總結(jié)下 add
方法:
假設(shè)我們現(xiàn)在對(duì)一個(gè) LongAddr
對(duì)象近顷,進(jìn)行寫操作
- 首先生音,是對(duì)
base
進(jìn)行cas
的寫,如果cas
的寫失敗了窒升,那么代表有線程在進(jìn)行競爭缀遍! - 此時(shí)
cells
是空的,方法執(zhí)行進(jìn)入longAccumulate
方法饱须,這個(gè)方法域醇,會(huì)實(shí)例化一個(gè) 新的cells
數(shù)組 - 之后如果有線程,執(zhí)行
add
方法的時(shí)候蓉媳,首先去修改cells
數(shù)組里面的對(duì)象的值譬挚。具體修改哪個(gè)值是按照線程的隨機(jī)數(shù),隨機(jī)選出來的酪呻,但是cells
數(shù)組的大小是 CPU 數(shù)量的最近的2倍值殴瘦!后面的并發(fā)修改,大部分都是 并行的修改cells
里面元素的Value
的值号杠!這樣相當(dāng)于,將原本的一個(gè)鎖丰歌,分化成了 多個(gè)鎖姨蟋,提高了并發(fā)度!
LongAddr
還有其他幾個(gè)方法立帖,基本上都是 間接的調(diào)用這個(gè) add 方法:
increment
: add(1L)
decrement
:add(-1L)
為了獲取眼溶,LongAddr 的結(jié)果,我們需要調(diào)用 sum 方法:
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
上代碼可以看出:結(jié)果就是 sum + cells
的結(jié)果晓勇!
但是這個(gè)結(jié)果是不一定準(zhǔn)確的堂飞!因?yàn)榭赡茏x取的時(shí)候,可能有的線程正在寫數(shù)據(jù)绑咱!
reset
: 方法就是把 base 設(shè)置為 0 绰筛,并且每個(gè) cells 的元素的值都設(shè)置為0
除了LongAddr
之外,描融,Java還提供了 DoubleAdder
的實(shí)現(xiàn)铝噩,這個(gè)類的實(shí)現(xiàn) 調(diào)用的是 doubleAccumulate
方法,這個(gè)方法的實(shí)現(xiàn)和longAccumulate
的實(shí)現(xiàn) 非常類似窿克!就是通過 doubleToRawLongBits
方法將都double
的值轉(zhuǎn)換成Long
骏庸!