WebFlux 是原生的發(fā)布訂閱工具励烦,可以很方便的構(gòu)建事件總線牙肝。下面是一個監(jiān)聽數(shù)據(jù)變動的監(jiān)聽器:
package com.example.demo;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
/**
* 數(shù)據(jù)監(jiān)聽器
*
* @author <a href="mailto:pushu@2dfire.com">樸樹</a>
* @date 2019-07-01 17:32
*/
public class ReactorDataMonitor {
private static final Map<Class, FluxSink> handlers = new ConcurrentHashMap<>();
/**
* 監(jiān)控指定類型的數(shù)據(jù)
*
* @param clz 數(shù)據(jù)類型
* @param handler 數(shù)據(jù)消費方式
*/
public static void monitor(Class clz, Consumer handler) {
Flux<Object> objectFlux = Flux.create(sink -> {
handlers.put(clz, sink);
sink.onCancel(() -> handlers.remove(clz));
}, FluxSink.OverflowStrategy.LATEST);
objectFlux.subscribe(handler);
}
/**
* 取消監(jiān)控數(shù)據(jù)
*
* @param clz 數(shù)據(jù)類型
*/
public static void unMonitor(Class clz) {
handlers.remove(clz);
}
/**
* 發(fā)布數(shù)據(jù)
*
* @param object
*/
public static void publish(Object object) {
handlers.forEach((key, value) -> {
if (key.equals(object.getClass())) {
value.next(object);
}
});
}
}
以上代碼中
FluxSink
是一個可以持續(xù)發(fā)布數(shù)據(jù)的數(shù)據(jù)源。