1. jar 導(dǎo)入
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
2.實(shí)現(xiàn)代碼
2.1 消息體
import lombok.Data;
@Data
public class Message {
private String data;
}
2.1 生產(chǎn)者
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class Producer {
private final Disruptor disruptor;
/**
* 發(fā)送數(shù)據(jù)
*
* @param data 數(shù)據(jù)
*/
public void send(String data) {
RingBuffer<Message> ringBuffer = disruptor.getRingBuffer();
// 獲取可以生成的位置
long next = ringBuffer.next();
try {
Message msg = ringBuffer.get(next);
msg.setData(data);
} finally {
ringBuffer.publish(next);
}
}
}
2.2 繼承WorkHandler 消費(fèi)者
import com.lmax.disruptor.WorkHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.disruptor.Message;
/**
* WorkHandler模式消費(fèi)者
*/
@Slf4j
@RequiredArgsConstructor
public class WorkConsumer implements WorkHandler<Message> {
/**
* WorkHandler編號(hào)
*/
private final Integer number;
/**
* WorkHandler消費(fèi):每個(gè)生產(chǎn)者生產(chǎn)的數(shù)據(jù)只能被一個(gè)消費(fèi)者消費(fèi)
*
* @param message 消息
*/
@Override
public void onEvent(Message message) {
log.info("work 接收到了消息 編號(hào) : {}, message: {}", number, message);
}
}
2.2 繼承EventHandler 消費(fèi)者
import com.lmax.disruptor.EventHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.disruptor.Message;
/**
* Event模式消費(fèi)者
*/
@Slf4j
@RequiredArgsConstructor
public class EventConsumer implements EventHandler<Message> {
/**
* Event編號(hào)
*/
private final Integer number;
/**
* Event消費(fèi):每個(gè)消費(fèi)者重復(fù)消費(fèi)生產(chǎn)者生產(chǎn)的數(shù)據(jù)
*
* @param message 消息
* @param sequence 當(dāng)前序列號(hào)
* @param endOfBatch 批次結(jié)束標(biāo)識(shí)(常用于將多個(gè)消費(fèi)著的數(shù)據(jù)依次組合到最后一個(gè)消費(fèi)者統(tǒng)一處理)
*/
@Override
public void onEvent(Message message, long sequence, boolean endOfBatch) {
log.info("Repeat 接收到了消息 編號(hào) : {}, message: {}, curr sequence: {}, is end: {}",
number, message, sequence, endOfBatch);
}
}
2.3 Disruptor 容器初始化
import com.lmax.disruptor.dsl.Disruptor;
import org.example.disruptor.Message;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors;
@Configuration
public class ConsumerConfig {
@Bean
public Disruptor<Message> disruptor() {
int bufferSize = 1024 * 1024; // 設(shè)置緩沖區(qū)大小
Disruptor<Message> disruptor = new Disruptor(Message::new, bufferSize, Executors.defaultThreadFactory());
// Work消費(fèi)
// WorkConsumer w1 = new WorkConsumer(1);
// WorkConsumer w2 = new WorkConsumer(2);
// WorkConsumer w3 = new WorkConsumer(3);
// WorkConsumer w4 = new WorkConsumer(4);
// disruptor.handleEventsWithWorkerPool(w1, w2, w3, w4);
// Event消費(fèi)
// EventConsumer a = new EventConsumer(1);
// EventConsumer b = new EventConsumer(2);
// disruptor.handleEventsWith(a, b);
// 鏈路式 消費(fèi) a -> b -> c
EventConsumer a = new EventConsumer(1);
EventConsumer b = new EventConsumer(2);
EventConsumer c = new EventConsumer(3);
disruptor.handleEventsWith(a).then(b).then(c);
disruptor.start();
return disruptor;
}
}
2.4 Controller
import org.example.disruptor.Producer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class DisruptorController {
@Resource
private Producer producer;
@GetMapping("/send")
public String add(String message) throws Exception {
for (int i = 0; i < 2; i++) {
producer.send(message + i);
}
return "success";
}
}
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者