前言
什么是請(qǐng)求聚合
見名之意就是將多次的請(qǐng)求整合為一個(gè)請(qǐng)求處理
如何實(shí)現(xiàn)請(qǐng)求聚合
有個(gè)快手大佬開源了一個(gè)工具類:buffer-trigger鳍怨,這玩意就可以用來做請(qǐng)求聚合。
buffer-trigger適用場(chǎng)景
高吞吐量消息處理: 當(dāng)系統(tǒng)需要處理大量快速產(chǎn)生的數(shù)據(jù)或消息時(shí)醇蝴,如日志記錄、事件追蹤绪商、實(shí)時(shí)交易數(shù)據(jù)等坛掠,單條消息的即時(shí)處理可能會(huì)導(dǎo)致過多的系統(tǒng)開銷(如網(wǎng)絡(luò)通信、數(shù)據(jù)庫操作等)盗忱。通過使用BufferTrigger酱床,可以將這些消息暫時(shí)緩存在阻塞隊(duì)列中,累積到一定數(shù)量后一次性進(jìn)行批量處理趟佃。這樣既能減少系統(tǒng)調(diào)用次數(shù)扇谣,提升整體處理效率昧捷,又能降低對(duì)下游系統(tǒng)的瞬時(shí)壓力。
延遲敏感但允許適度延后處理: 在某些業(yè)務(wù)場(chǎng)景中罐寨,數(shù)據(jù)或消息的處理雖有一定的時(shí)效性要求靡挥,但并不嚴(yán)格到需要立即響應(yīng)。例如衩茸,用戶行為分析芹血、運(yùn)營統(tǒng)計(jì)報(bào)表生成等任務(wù),可以在容忍的時(shí)間窗口內(nèi)完成楞慈。BufferTrigger通過設(shè)置批處理閾值和延遲等待時(shí)間幔烛,允許在滿足一定積累量或等待時(shí)間后才觸發(fā)消費(fèi),從而實(shí)現(xiàn)數(shù)據(jù)的“準(zhǔn)實(shí)時(shí)”處理囊蓝,兼顧了處理效率與延遲需求饿悬。
資源優(yōu)化與成本控制: 對(duì)于依賴付費(fèi)服務(wù)(如云存儲(chǔ)、API調(diào)用)或者計(jì)算資源有限的情況聚霜,批量處理能夠顯著減少對(duì)外部服務(wù)的調(diào)用量或內(nèi)部計(jì)算資源的占用狡恬。例如,定期向云存儲(chǔ)批量上傳日志文件蝎宇、批量發(fā)送電子郵件通知弟劲、批量查詢外部API并聚合結(jié)果等。BufferTrigger通過合并多個(gè)小任務(wù)為一個(gè)大任務(wù)姥芥,有助于降低單位數(shù)據(jù)處理的成本兔乞。
避免頻繁IO操作: 若消息的消費(fèi)涉及大量的磁盤IO、網(wǎng)絡(luò)IO或其他昂貴的系統(tǒng)資源操作凉唐,如數(shù)據(jù)庫寫入庸追、文件寫入、跨網(wǎng)絡(luò)的數(shù)據(jù)同步等台囱,頻繁的單個(gè)操作可能導(dǎo)致性能瓶頸淡溯。BufferTrigger通過批量處理,能夠減少這類操作的次數(shù)簿训,從而提高系統(tǒng)整體性能咱娶。
微服務(wù)間解耦與流量控制: 在分布式微服務(wù)架構(gòu)中,不同服務(wù)之間可能存在強(qiáng)依賴關(guān)系强品。使用BufferTrigger可以在服務(wù)間引入一層緩沖豺总,避免下游服務(wù)瞬時(shí)過載或臨時(shí)不可用導(dǎo)致整個(gè)系統(tǒng)崩潰。同時(shí)择懂,批量處理能平滑消費(fèi)端的請(qǐng)求流量,減輕對(duì)上游服務(wù)的壓力另玖,增強(qiáng)系統(tǒng)的穩(wěn)定性和容錯(cuò)能力困曙。
如何使用buffer-trigger
1表伦、在項(xiàng)目的pom中引入buffer-trigger GAV
<dependency>
<groupId>com.github.phantomthief</groupId>
<artifactId>buffer-trigger</artifactId>
<version>0.2.9</version>
</dependency>
2、使用案例一:使用SimpleBufferTrigger
/**
* {@link BufferTrigger}的通用實(shí)現(xiàn)慷丽,適合大多數(shù)業(yè)務(wù)場(chǎng)景
* <p>
* 消費(fèi)觸發(fā)策略會(huì)考慮消費(fèi)回調(diào)函數(shù)的執(zhí)行時(shí)間蹦哼,實(shí)際執(zhí)行間隔 = 理論執(zhí)行間隔 - 消費(fèi)回調(diào)函數(shù)執(zhí)行時(shí)間;
* 如回調(diào)函數(shù)執(zhí)行時(shí)間已超過理論執(zhí)行間隔要糊,將立即執(zhí)行下一次消費(fèi)任務(wù).
*
* @author w.vela
*/
示例:
public class BufferTriggerDemo {
BufferTrigger<Long> bufferTrigger = BufferTrigger.<Long, Map<Long, AtomicInteger>> simple()
.maxBufferCount(10)
.interval(4, TimeUnit.SECONDS)
.setContainer(ConcurrentHashMap::new, (map, uid) -> {
map.computeIfAbsent(uid, key -> new AtomicInteger()).addAndGet(1);
return true;
})
.consumer(this::consumer)
.build();
public void consumer(Map<Long, AtomicInteger> map) {
System.out.println(map);
}
public void test() throws InterruptedException {
// 進(jìn)程退出時(shí)手動(dòng)消費(fèi)一次
Runtime.getRuntime().addShutdownHook(new Thread(() -> bufferTrigger.manuallyDoTrigger()));
// 最大容量是10纲熏,這里嘗試添加11個(gè)元素0-10
for (int i = 0; i < 5; i ++) {
for (long j = 0; j < 11; j ++) {
bufferTrigger.enqueue(j);
}
}
Thread.sleep(7000);
}
參數(shù)描述
-
maxBuffeCount(long count):
指定容器最大容量,比如這里指定了10锄俄,當(dāng)在下次聚合前容器元素?cái)?shù)量達(dá)到10就無法添加了局劲,-1表示無限制; - internal(longinterval, TimeUnit unit) :表示多久聚合一次奶赠,如果沒達(dá)到時(shí)間那么consumer是不會(huì)輸出的鱼填,聚合后容器就空了。
-
setContainer(Supplier<? extends C> factory, BiPredicate<? super C, ? super E> queueAdder):
第一個(gè)變量為factory毅戈,是個(gè)Supplier苹丸,獲取容器用的,要求線程安全;第二個(gè)變量是緩存更新的方法BiPredicate<?
super C, ? super E> queueAdder C為容器類型,E為元素類型 -
consumer(ThrowableConsumer<? super C, Throwable> consumer):
表示如何消費(fèi)聚合后的數(shù)據(jù)苇经,標(biāo)識(shí)我們?nèi)绾稳ハM(fèi)聚合后的數(shù)據(jù)赘理,我這里就是簡(jiǎn)單打印。 enqueue(E element): 添加元素扇单; - manuallyDoTrigger: 主動(dòng)觸發(fā)一次消費(fèi)商模,通常在java進(jìn)程關(guān)閉的時(shí)候調(diào)用
2、使用案例二:使用BatchConsumeBlockingQueueTrigger
/**
* {@link BufferTrigger}基于阻塞隊(duì)列的批量消費(fèi)觸發(fā)器實(shí)現(xiàn).
* <p>
* 該觸發(fā)器適合生產(chǎn)者-消費(fèi)者場(chǎng)景令花,緩存容器基于{@link LinkedBlockingQueue}隊(duì)列實(shí)現(xiàn).
* <p>
* 觸發(fā)策略類似Kafka linger阻桅,批處理閾值與延遲等待時(shí)間滿足其一即觸發(fā)消費(fèi)回調(diào).
* @author w.vela
*/
示例:
public class BufferTriggerDemo2 {
BufferTrigger<Long> bufferTrigger = BufferTrigger.<Long>batchBlocking()
.bufferSize(50)
.batchSize(10)
.linger(Duration.ofSeconds(1))
.setConsumerEx(this::consume)
.build();
private void consume(List<Long> nums) {
System.out.println(nums);
}
public void test() throws InterruptedException {
// 進(jìn)程退出時(shí)手動(dòng)消費(fèi)一次
Runtime.getRuntime().addShutdownHook(new Thread(() -> bufferTrigger.manuallyDoTrigger()));
for (long j = 0; j < 60; j ++) {
bufferTrigger.enqueue(j);
}
Thread.sleep(7000);
}
- batchBlocking():提供自帶背壓(back-pressure)的簡(jiǎn)單批量歸并消費(fèi)能力;
- bufferSize(intbufferSize): 緩存隊(duì)列的最大容量; batchSize(int size): 批處理元素的數(shù)量閾值兼都,達(dá)到這個(gè)數(shù)量后也會(huì)進(jìn)行消費(fèi)
- linger(Duration duration): 多久消費(fèi)一次
-
setConsumerEx(ThrowableConsumer<?
super List, Exception> consumer): 消費(fèi)函數(shù)嫂沉,注入的對(duì)象為緩存隊(duì)列中尚存的所有元素,非逐個(gè)元素消費(fèi)扮碧;
3趟章、兩種實(shí)現(xiàn)方式在使用上的區(qū)別
BatchConsumeBlockingQueueTrigger每次將元素原封不動(dòng)保存下來,然后一次性消費(fèi)一整個(gè)列表元素慎王。而SimpleBufferTrigger蚓土,每次添加元素都會(huì)進(jìn)行計(jì)算。
以上示例摘抄該博文https://juejin.cn/post/7160569936576774181
這篇文章比較詳細(xì)對(duì)請(qǐng)求聚合以及buffer-trigger進(jìn)行了介紹
更多buffer-trigger內(nèi)容可以看官方源碼注釋以及相應(yīng)的單元測(cè)試案例
https://github.com/PhantomThief/buffer-trigger
以上就是buffer-trigger的使用教程赖淤,不過如果只是寫到這邊蜀漆,就沒啥意思了,下面就以一個(gè)實(shí)戰(zhàn)的例子咱旱,來演示下如何實(shí)現(xiàn)請(qǐng)求聚合
案例
注: 以一個(gè)批量注冊(cè)用戶為例子确丢,來演示請(qǐng)求聚合绷耍。案例將buffer-trigger與springboot做了一個(gè)整合。案例只列出核心代碼鲜侥,完整示例查看文末demo鏈接
1褂始、項(xiàng)目中引入buffer-trigger GAV
<dependency>
<groupId>com.github.phantomthief</groupId>
<artifactId>buffer-trigger</artifactId>
<version>${buffer.trigger.version}</version>
</dependency>
2、封裝請(qǐng)求參數(shù)類
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class DataExchange<T,R> {
private String bizNo;
private T request;
private CompletableFuture<Result<R>> response;
}
3描函、封裝buffer-trigger處理類
@RequiredArgsConstructor
public class DelegateBatchConsumerTriggerHandler<T, R> implements BatchConsumerTriggerHandler<T, R>{
private final BufferTrigger<DataExchange<T, R>> bufferTrigger;
@SneakyThrows
@Override
public Result<R> handle(T request, String bizNo) {
DataExchange dataExchange = new DataExchange<>();
dataExchange.setBizNo(bizNo);
dataExchange.setRequest(request);
CompletableFuture<Result> response = new CompletableFuture<>();
dataExchange.setResponse(response);
bufferTrigger.enqueue(dataExchange);
return response.get();
}
@Override
public void closeBufferTrigger() {
// 觸發(fā)該事件崎苗,關(guān)閉BufferTrigger,并將未消費(fèi)的數(shù)據(jù)消費(fèi)
if(bufferTrigger != null){
bufferTrigger.close();
}
}
}
4舀寓、封裝buffer-trigger創(chuàng)建工廠
public interface BatchConsumerTriggerFactory {
default <T,R> BatchConsumerTriggerBuilder<DataExchange<T,R>> builder(){
return null;
}
default <T,R> BufferTrigger<DataExchange<T,R>> getTrigger(ThrowableConsumer<List<DataExchange<T,R>>, Exception> consumer, String bufferTriggerBizType){
if(!support(bufferTriggerBizType)){
return null;
}
return builder().setConsumerEx(consumer).build();
}
boolean support(String bufferTriggerBizType);
default <T,R> BatchConsumerTriggerHandler<T,R> getTriggerHandler(ThrowableConsumer<List<DataExchange<T,R>>, Exception> consumer, String bufferTriggerBizType){
BufferTrigger<DataExchange<T, R>> trigger = getTrigger(consumer, bufferTriggerBizType);
return new DelegateBatchConsumerTriggerHandler<>(trigger);
}
}
5胆数、模擬用戶注冊(cè)dao
@Repository
public class UserDao {
private final Map<Long, User> userMap = new ConcurrentHashMap<>();
private final ThreadLocalRandom random = ThreadLocalRandom.current();
private final LongAdder idAdder = new LongAdder();
public User register(UserDTO userDTO){
mockExecuteCostTime();
return getUser(userDTO);
}
public List<User> batchRegister(List<UserDTO> userDTOs){
mockExecuteCostTime();
List<User> users = new ArrayList<>();
userDTOs.forEach(userDTO -> users.add(getUser(userDTO)));
return users;
}
6、模擬用戶注冊(cè)service
a基公、 常規(guī)方式
@Service
@RequiredArgsConstructor
public class UserServiceImpl implements UserService {
private final UserDao userDao;
private final LongAdder count = new LongAdder();
@Override
public Result<User> register(UserDTO user) {
count.increment();
System.out.println("執(zhí)行次數(shù):" + count.sum());
return Result.success(userDao.register(user));
}
}
b障癌、 請(qǐng)求聚合方式
前置條件: 需在yml指定相關(guān)隊(duì)列督赤、定時(shí)器配置以及業(yè)務(wù)類別
lybgeek:
buffer:
trigger:
consume-queue-trigger-properties:
- bufferTriggerBizType: userReisgeter
config:
batchSize: 100
bufferSize: 1000
batchConsumeIntervalMills: 1000
@Service
@RequiredArgsConstructor
public class UserServiceBufferTriggerImpl implements UserService, InitializingBean, DisposableBean {
public static final String BUFFER_TRIGGER_BIZ_TYPE = "userReisgeter";
private final UserDao userDao;
private final BatchConsumerTriggerFactory batchConsumerTriggerFactory;
private BatchConsumerTriggerHandler<UserDTO,User> batchConsumerTriggerHandler;
private final LongAdder count = new LongAdder();
@SneakyThrows
@Override
public Result<User> register(UserDTO user) {
return batchConsumerTriggerHandler.handle(user,BUFFER_TRIGGER_BIZ_TYPE + "-" + UUID.randomUUID());
}
@Override
public void afterPropertiesSet() throws Exception {
// key為業(yè)務(wù)屬性唯一鍵,如果不存在業(yè)務(wù)屬性唯一鍵,則可以取bizNo作為key台汇,示例以u(píng)sername作為唯一鍵
Map<String, CompletableFuture<Result<User>>> completableFutureMap = new HashMap<>();
batchConsumerTriggerHandler = batchConsumerTriggerFactory.getTriggerHandler((ThrowableConsumer<List<DataExchange<UserDTO, User>>, Exception>) dataExchanges -> {
List<UserDTO> userDTOs = new ArrayList<>();
for (DataExchange<UserDTO, User> dataExchange : dataExchanges) {
UserDTO userDTO = dataExchange.getRequest();
completableFutureMap.put(userDTO.getUsername(),dataExchange.getResponse());
userDTOs.add(userDTO);
}
count.increment();
System.out.println("執(zhí)行次數(shù):" + count.sum());
List<User> users = userDao.batchRegister(userDTOs);
if(CollectionUtil.isNotEmpty(users)){
for (User user : users) {
CompletableFuture<Result<User>> completableFuture = completableFutureMap.remove(user.getUsername());
if(completableFuture != null){
completableFuture.complete(Result.success(user));
}
}
}
},BUFFER_TRIGGER_BIZ_TYPE);
}
@Override
public void destroy() throws Exception {
// 觸發(fā)該事件咖杂,關(guān)閉BufferTrigger乞巧,并將未消費(fèi)的數(shù)據(jù)消費(fèi)
batchConsumerTriggerHandler.closeBufferTrigger();
}
}
7洋访、分別開啟20個(gè)線程,對(duì)常規(guī)方式以及聚合方式的service進(jìn)行測(cè)試
a斑司、 常規(guī)方式
@Test
public void testRegisterUserByCommon() throws IOException {
new ConcurrentCall(20).run(()->{
UserDTO user = UserUtil.generateUser();
return userServiceImpl.register(user);
});
}
控制臺(tái)輸出
執(zhí)行次數(shù):1
執(zhí)行次數(shù):2
執(zhí)行次數(shù):7
執(zhí)行次數(shù):6
執(zhí)行次數(shù):10
執(zhí)行次數(shù):9
執(zhí)行次數(shù):5
執(zhí)行次數(shù):4
執(zhí)行次數(shù):11
執(zhí)行次數(shù):12
執(zhí)行次數(shù):3
執(zhí)行次數(shù):8
執(zhí)行次數(shù):17
執(zhí)行次數(shù):16
執(zhí)行次數(shù):15
執(zhí)行次數(shù):18
執(zhí)行次數(shù):14
執(zhí)行次數(shù):20
執(zhí)行次數(shù):13
執(zhí)行次數(shù):19
Result(code=200, msg=success, data=User(id=1, username=yangweize, fullname=楊偉澤, age=12, email=yangweize@qq.com, mobile=64294835455))
Result(code=200, msg=success, data=User(id=3, username=yaojinpeng, fullname=姚晉鵬, age=13, email=yaojinpeng@qq.com, mobile=5381-03836251))
Result(code=200, msg=success, data=User(id=9, username=pengxiaoran, fullname=彭瀟然, age=25, email=pengxiaoran@qq.com, mobile=903-85787160))
Result(code=200, msg=success, data=User(id=9, username=guoweize, fullname=郭偉澤, age=9, email=guoweize@qq.com, mobile=57105382845))
Result(code=200, msg=success, data=User(id=8, username=huangjinyu, fullname=黃瑾瑜, age=29, email=huangjinyu@qq.com, mobile=449-27085386))
Result(code=200, msg=success, data=User(id=6, username=renkairui, fullname=任楷瑞, age=3, email=renkairui@qq.com, mobile=2777-67842072))
Result(code=200, msg=success, data=User(id=2, username=fuhaoran, fullname=傅昊然, age=15, email=fuhaoran@qq.com, mobile=332-47390793))
Result(code=200, msg=success, data=User(id=5, username=linmingxuan, fullname=林明軒, age=27, email=linmingxuan@qq.com, mobile=116-31209336))
Result(code=200, msg=success, data=User(id=5, username=shensicong, fullname=沈思聰, age=6, email=shensicong@qq.com, mobile=0532-05033168))
Result(code=200, msg=success, data=User(id=11, username=gongtianyu, fullname=龔天宇, age=4, email=gongtianyu@qq.com, mobile=9752-26976731))
Result(code=200, msg=success, data=User(id=13, username=xiongminghui, fullname=熊明輝, age=23, email=xiongminghui@qq.com, mobile=0049-21709250))
Result(code=200, msg=success, data=User(id=17, username=huzhize, fullname=胡志澤, age=0, email=huzhize@qq.com, mobile=760-85426527))
Result(code=200, msg=success, data=User(id=16, username=gaosiyuan, fullname=高思源, age=5, email=gaosiyuan@qq.com, mobile=42452304656))
Result(code=200, msg=success, data=User(id=13, username=mojiaxi, fullname=莫嘉熙, age=2, email=mojiaxi@qq.com, mobile=7264-82263592))
Result(code=200, msg=success, data=User(id=18, username=caizimo, fullname=蔡子默, age=12, email=caizimo@qq.com, mobile=2653-82403850))
Result(code=200, msg=success, data=User(id=10, username=wancongjian, fullname=萬聰健, age=10, email=wancongjian@qq.com, mobile=954-37654583))
Result(code=200, msg=success, data=User(id=14, username=gongyuebin, fullname=龔越彬, age=0, email=gongyuebin@qq.com, mobile=77884047173))
Result(code=200, msg=success, data=User(id=15, username=fenghongtao, fullname=馮鴻濤, age=2, email=fenghongtao@qq.com, mobile=8832-09658213))
Result(code=200, msg=success, data=User(id=19, username=jiangyuanbo, fullname=江苑博, age=12, email=jiangyuanbo@qq.com, mobile=2132-90700641))
Result(code=200, msg=success, data=User(id=20, username=xiaoxinlei, fullname=蕭鑫磊, age=13, email=xiaoxinlei@qq.com, mobile=02196775183))
b渗饮、 聚合請(qǐng)求方式
@Test
public void testRegisterUserByBufferTrigger() throws IOException {
new ConcurrentCall(20).run(()->{
UserDTO user = UserUtil.generateUser();
return userServiceBufferTriggerImpl.register(user);
});
}
控制臺(tái)輸出
執(zhí)行次數(shù):1
Result(code=200, msg=success, data=User(id=1, username=heguo, fullname=何果, age=10, email=heguo@qq.com, mobile=5725-06130005))
Result(code=200, msg=success, data=User(id=7, username=houwen, fullname=侯文, age=9, email=houwen@qq.com, mobile=85830365362))
Result(code=200, msg=success, data=User(id=11, username=yangxiaoyu, fullname=楊笑愚, age=5, email=yangxiaoyu@qq.com, mobile=13776594491))
Result(code=200, msg=success, data=User(id=3, username=yusimiao, fullname=余思淼, age=5, email=yusimiao@qq.com, mobile=070-18231344))
Result(code=200, msg=success, data=User(id=12, username=haotianyu, fullname=郝天宇, age=10, email=haotianyu@qq.com, mobile=42693432247))
Result(code=200, msg=success, data=User(id=14, username=wangxinpeng, fullname=汪鑫鵬, age=1, email=wangxinpeng@qq.com, mobile=59660609063))
Result(code=200, msg=success, data=User(id=15, username=tanzhichen, fullname=覃智宸, age=25, email=tanzhichen@qq.com, mobile=075-00624335))
Result(code=200, msg=success, data=User(id=4, username=lu:haoxuan, fullname=呂皓軒, age=14, email=lu:haoxuan@qq.com, mobile=9548-30583153))
Result(code=200, msg=success, data=User(id=2, username=qiuyinxiang, fullname=邱胤祥, age=18, email=qiuyinxiang@qq.com, mobile=04148786960))
Result(code=200, msg=success, data=User(id=5, username=weiweicheng, fullname=魏偉誠, age=25, email=weiweicheng@qq.com, mobile=0960-77489940))
Result(code=200, msg=success, data=User(id=20, username=tanbin, fullname=譚彬, age=27, email=tanbin@qq.com, mobile=297-57401738))
Result(code=200, msg=success, data=User(id=18, username=husiyuan, fullname=胡思遠(yuǎn), age=24, email=husiyuan@qq.com, mobile=0809-08658163))
Result(code=200, msg=success, data=User(id=16, username=shishengrui, fullname=石晟睿, age=26, email=shishengrui@qq.com, mobile=8205-70004359))
Result(code=200, msg=success, data=User(id=17, username=lu:zihan, fullname=呂子涵, age=0, email=lu:zihan@qq.com, mobile=162-35081974))
Result(code=200, msg=success, data=User(id=19, username=xionghaoran, fullname=熊昊然, age=19, email=xionghaoran@qq.com, mobile=588-09693393))
Result(code=200, msg=success, data=User(id=13, username=jiangyuebin, fullname=姜越彬, age=19, email=jiangyuebin@qq.com, mobile=472-74492380))
Result(code=200, msg=success, data=User(id=8, username=haoweicheng, fullname=郝偉誠, age=26, email=haoweicheng@qq.com, mobile=73205366322))
Result(code=200, msg=success, data=User(id=10, username=tanhongxuan, fullname=譚鴻煊, age=18, email=tanhongxuan@qq.com, mobile=78536254981))
Result(code=200, msg=success, data=User(id=9, username=xielicheng, fullname=謝立誠, age=18, email=xielicheng@qq.com, mobile=4364-05053591))
Result(code=200, msg=success, data=User(id=6, username=weiluyang, fullname=韋鷺洋, age=28, email=weiluyang@qq.com, mobile=92876761170))
c、 結(jié)果分析
常規(guī)方式需要調(diào)用20次宿刮,將結(jié)果返回互站。聚合方式僅需調(diào)用一次,就將結(jié)果返回
總結(jié)
本文主要講解如何進(jìn)行請(qǐng)求聚合僵缺,請(qǐng)求聚合主要適用于那些需要高效胡桃、批量處理數(shù)據(jù)或消息,并且對(duì)處理延遲有一定容忍度的場(chǎng)景磕潮。
我們?cè)谑褂谜?qǐng)求聚合時(shí)翠胰,相關(guān)的下游最好能提供批量接口
其次BufferTrigger是單線程消費(fèi),在并發(fā)很高的場(chǎng)景下可能會(huì)出現(xiàn)消費(fèi)速度跟不上生產(chǎn)速度,這很容易導(dǎo)致full gc問題自脯。所以如果有必要的話需要使用線程池來提升消費(fèi)速度
demo鏈接
https://github.com/lyb-geek/springboot-learning/tree/master/springboot-buffer-trigger