使用BlockingQueue實現(xiàn)
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**生產(chǎn)者*/
public class Producer implements Runnable {
/**內(nèi)存緩沖區(qū)*/
private BlockingQueue<PCData> queue;
/**總數(shù)跨晴,原子操作*/
private static AtomicInteger count = new AtomicInteger();
/**停止線程*/
private volatile boolean isRunning = true;
public Producer(BlockingQueue<PCData> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("start producer id=" + Thread.currentThread().getId());
try {
while (isRunning) {
//構(gòu)造任務(wù)數(shù)據(jù)
PCData data = new PCData(count.incrementAndGet());
//提交數(shù)據(jù)到緩沖區(qū)中
System.out.println(data+" is put into queue");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
System.out.println("failed to put data:" + data);
}
Thread.sleep(new Random().nextInt(1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
/**停止線程*/
public void stop() {
isRunning = false;
}
}
/**任務(wù)相關(guān)的數(shù)據(jù)*/
public final class PCData {
private final int data;
public PCData(int data) {
this.data = data;
}
public int getData() {
return data;
}
@Override
public String toString() {
return "PCData{" +
"data=" + data +
'}';
}
}
import java.text.MessageFormat;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
/**內(nèi)存緩沖區(qū)*/
private BlockingQueue<PCData> queue;
public Consumer(BlockingQueue<PCData> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("start Consumer id=" + Thread.currentThread().getId());
try {
while (true) {
//提取任務(wù)
PCData data = queue.take();
if (null != data) {
int getData = data.getData();
System.out.println(MessageFormat.format("{0}*{1}={2}",
getData, getData, getData * getData));
}
Thread.sleep(new Random().nextInt(1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
public class Client {
public static void main(String[] args) throws InterruptedException {
//創(chuàng)建對象
BlockingQueue<PCData> queue = new LinkedBlockingDeque<>(10);
ExecutorService service = Executors.newCachedThreadPool();
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
Consumer consumer3 = new Consumer(queue);
//運行
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer1);
service.execute(consumer2);
service.execute(consumer3);
//停止生產(chǎn)者
Thread.sleep(10 * 1000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(3000);
service.shutdown();
//消費者還在等,處于WAITING狀態(tài)
}
}
使用Disruptor實現(xiàn)
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
public class PCData {
private long value;
public void set(long value) {
this.value = value;
}
public long get() {
return value;
}
}
import com.lmax.disruptor.EventFactory;
/**
* 工廠類在Disruptor系統(tǒng)初始化時螃壤,構(gòu)造所有緩沖區(qū)中的對象實例
* (Disruptor會預(yù)先分配空間)
*/
public class PCDataFactory implements EventFactory<PCData> {
@Override
public PCData newInstance() {
return new PCData();
}
}
import com.lmax.disruptor.WorkHandler;
import java.text.MessageFormat;
public class Consumer implements WorkHandler<PCData> {
/**
* 這里只需要數(shù)據(jù)的處理就可以了
* @param pcData
* @throws Exception
*/
@Override
public void onEvent(PCData pcData) throws Exception {
String msg = MessageFormat.format(
"{0}:Event: --{1}--",
Thread.currentThread().getId(),
pcData.get()*pcData.get());
System.out.println(msg);
}
}
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
public class Producer {
private final RingBuffer<PCData> ringBuffer;
public Producer(RingBuffer<PCData> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void pushData(ByteBuffer bb) {
//獲取序列
long sequence = ringBuffer.next();
try {
//設(shè)置數(shù)據(jù)
PCData pcData = ringBuffer.get(sequence);
pcData.set(bb.getLong(0));
} finally {
//標記可用
ringBuffer.publish(sequence);
}
}
}
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
public class TestClient {
public static void main(String[] args) throws InterruptedException {
//RingBuffer的長度的必須是2的整數(shù)次冪领斥。
int ringBufferSize = 4;
Disruptor<PCData> disruptor = new Disruptor<PCData>(
new PCDataFactory(),
ringBufferSize,
Executors.defaultThreadFactory(),
ProducerType.MULTI,
new BlockingWaitStrategy()
);
//每一個消費者會映射到一個線程中。
disruptor.handleEventsWithWorkerPool(
new Consumer(),
new Consumer(),
new Consumer(),
new Consumer()
);
//不要忘了啟動Disruptor
disruptor.start();
Producer producer = new Producer(disruptor.getRingBuffer());
ByteBuffer bb = ByteBuffer.allocate(8);
for (int i = 0; true ; i++) {
Thread.sleep(100);
producer.pushData(bb.putLong(0, i));
}
}
}