1.簡介
我們程序常常在并發(fā)場景下使用哭廉,所以我們常常使用支持并發(fā)的數(shù)據(jù)結(jié)構(gòu)盘榨,在java中有很多自帶的java支持高并發(fā)的數(shù)據(jù)結(jié)構(gòu)解总,在java.util.concurrent包下宇葱,基本由Doug Lea 編寫。這次我們自己實現(xiàn)一個無鎖高并發(fā)隊列往史。
2. 基本原理
基本原理還是使用java.concurrent.atomic 包下的原子類仗颈,結(jié)合最終一致性即可
3. 實現(xiàn)
package liusheng.main;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
/**
* 使用的是原子類
*
* @param <E>
*/
public class ConcurrentQueue<E> {
static class Node<E> {
E e;
volatile Node<E> next;
public Node(E e) {
this.e = e;
}
}
AtomicReference<Node<E>> head = new AtomicReference<>();
AtomicReference<Node<E>> last = new AtomicReference<>();
AtomicInteger size = new AtomicInteger();
public ConcurrentQueue() {
Node<E> node = new Node<E>(null);
head.set(node);
last.set(node);
}
public void offer(E e) {
Node<E> node = new Node<>(e);
Node<E> pre = null;
do {
pre = last.get();
} while (!last.compareAndSet(pre, node));
pre.next = node;
size.getAndIncrement();
}
public E poll() {
Node<E> node, node1;
do {
node = head.get();
node1 = node.next;
} while (node1 != null && !head.compareAndSet(node, node1));
if (node1 != null) {
size.decrementAndGet();
return node1.e;
}
return null;
}
public static void main(String[] args) throws InterruptedException {
while (true) {
ExecutorService executorService = Executors.newFixedThreadPool(20);
ConcurrentQueue<Long> queue = new ConcurrentQueue<>();
IntStream.range(0, 10).boxed().map(i -> (Runnable) () -> {
for (int j = 0; j < 1000; j++) {
queue.offer(System.currentTimeMillis());
}
}).forEach(executorService::execute);
AtomicInteger a = new AtomicInteger();
IntStream.range(0, 10).boxed().map(i -> (Runnable) () -> {
for (int j = 0; j < 1000; j++) {
if (queue.poll() != null) {
a.getAndIncrement();
}
}
}).forEach(executorService::execute);
executorService.shutdown();
executorService.awaitTermination(2, TimeUnit.DAYS);
// System.out.println(a.get() + queue.size.get());
assert a.get() + queue.size.get() == 10000;
}
}
}
4 總結(jié)
我們自己實現(xiàn)了一個無鎖高并發(fā)的隊列,可以加深我們對原子類的理解和使用椎例,同時我們在以后看別人的實現(xiàn)挨决,也容易了很多,也加深的了我對多線程的理解订歪。有什么錯誤的地方望大佬指正