背景
C# 版本庫(kù) MediatR 是一個(gè)中介者模式實(shí)現(xiàn)類庫(kù)宫莱,其核心是一個(gè)中介 者模式的.NET實(shí)現(xiàn)辉词,其目的是消息發(fā)送和消息處理的解耦矾瑰。它支持單播和多播形式使用同步或異步的模式來(lái)發(fā)布消息,創(chuàng)建和幀聽事件贼陶。
java中沒(méi)有找到類似類庫(kù),在對(duì)MediatR源碼閱讀中欣福,發(fā)現(xiàn)其主要思路是借助IOC獲取Request與Handler對(duì)應(yīng)關(guān)系并進(jìn)行處理绢片。
中介者模式
中介者模式:用一個(gè)中介對(duì)象封裝一系列的對(duì)象交互,中介者使各對(duì)象不需要顯示地相互作用畏陕,從而使耦合松散配乓,而且可以獨(dú)立地改變他們之間的交互。
使用中介模式,對(duì)象之間的交互將封裝在中介對(duì)象中犹芹,對(duì)象不再直接交互(解耦),而是通過(guò)中介進(jìn)行交互崎页,這減少了對(duì)象之間的依賴性,從而減少了耦合腰埂。
應(yīng)用
單播消息傳輸
單播消息傳輸飒焦,也就是一對(duì)第一的消息傳遞,一個(gè)消息對(duì)應(yīng)一個(gè)消息處理屿笼,通過(guò) IReust
抽象單播消息牺荠,使用 IRequestHandler
進(jìn)行消息處理
@ExtendWith(SpringExtension.class)
@Import(
value = {
Mediator.class,
PingPongTest.PingHandler.class,
}
)
public class PingPongTest {
@Autowired
IMediator mediator;
@Test
public void should() {
String send = mediator.send(new Ping());
assertThat(send).isNotNull();
assertThat(send).isEqualTo("Pong");
}
public static class Ping implements IRequest<String> {
}
public static class PingHandler implements IRequestHandler<Ping, String> {
@Override
public String handle(Ping request) {
return "Pong";
}
}
}
多播消息傳輸
多播消息傳輸,是一對(duì)多的消息傳遞驴一,一個(gè)消息對(duì)應(yīng)多個(gè)消息處理休雌,通過(guò) INotification
抽象多播消息,使用 INotificationHanlder
進(jìn)行消息處理
@ExtendWith(SpringExtension.class)
@Import(
value = {
Mediator.class,
PingNoticeTests.Pong1.class,
PingNoticeTests.Pong2.class,
}
)
public class PingNoticeTests {
@Autowired
IMediator mediator;
@Autowired
Pong1 pong1;
@Autowired
Pong2 pong2;
@Test
public void should() {
mediator.publish(new Ping());
assertThat(pong1.getCode()).isEqualTo("Pon1");
assertThat(pong2.getCode()).isEqualTo("Pon2");
}
public static class Ping implements INotification {
}
public static class Pong1 implements INotificationHandler<Ping> {
private String code;
public String getCode() {
return code;
}
@Override
public void handle(Ping notification) {
this.code = "Pon1";
}
}
public static class Pong2 implements INotificationHandler<Ping> {
private String code;
public String getCode() {
return code;
}
@Override
public void handle(Ping notification) {
this.code = "Pon2";
}
}
}
實(shí)現(xiàn)
核心實(shí)現(xiàn)
其主要點(diǎn)是從Spring的ApplicationContext中獲取相關(guān)接口bean肝断,然會(huì)執(zhí)行bean方法杈曲。
核心方法有兩個(gè):public(多播)和send(單播)。
借助ResolvableType類型構(gòu)造解析bean信息孝情,得到信息后從spring中獲取對(duì)象實(shí)例鱼蝉。
/**
* 中介者實(shí)現(xiàn)類
* <p>
* 依賴 ApplicationContext
*/
@Component
public class Mediator implements IMediator, ApplicationContextAware {
private ApplicationContext context;
/**
* 發(fā)布同步
* <p>
* 根據(jù)通知類型和INotificationHandler,從ApplicationContext獲取Handler的BeanNames,
* 將 BeanNames 轉(zhuǎn)化為 INotificationHandler 的實(shí)例,每個(gè)實(shí)例調(diào)用一次handler
*
* @param notification 通知內(nèi)容
* @param <TNotification> 通知類型
*/
@Override
public <TNotification extends INotification> void publish(TNotification notification) {
ResolvableType handlerType = ResolvableType.forClassWithGenerics(
INotificationHandler.class, notification.getClass());
String[] beanNamesForType = this.context.getBeanNamesForType(handlerType);
List<INotificationHandler<TNotification>> list = new ArrayList<>();
for (String beanBane :
beanNamesForType) {
list.add((INotificationHandler<TNotification>) this.context.getBean(beanBane));
}
list.forEach(h -> h.handle(notification));
}
/**
* 發(fā)送求取
* <p>
* 根據(jù)request類型箫荡,獲取到response類型魁亦,
* 根據(jù)IRequestHandler、request類型羔挡、response類型從ApplicationContext獲取
* IRequestHandler實(shí)例列表洁奈,取第一個(gè)實(shí)例執(zhí)行handler方法。
* <p>
* <p>
* 如果為找到handler實(shí)例绞灼,拋出NoRequestHandlerException異常
*
* @param request 請(qǐng)求
* @param <TResponse> 響應(yīng)類型
* @return 響應(yīng)結(jié)果
*/
@Override
public <TResponse> TResponse send(IRequest<TResponse> request) {
Type[] genericInterfaces = request.getClass().getGenericInterfaces();
Type responseType = null;
for (Type type : genericInterfaces) {
if ((type instanceof ParameterizedType)) {
ParameterizedType parameterizedType = (ParameterizedType) type;
if (!parameterizedType.getRawType().equals(IRequest.class)) {
continue;
}
responseType = parameterizedType.getActualTypeArguments()[0];
break;
}
}
if (responseType == null) {
// 拋出異常
throw new NoRequestHandlerException(request.getClass());
}
Class<?> requestClass = request.getClass();
Class<?> responseClass = (Class<?>) responseType;
ResolvableType handlerType = ResolvableType.forClassWithGenerics(
IRequestHandler.class,
requestClass,
responseClass);
String[] beanNamesForType = this.context.getBeanNamesForType(handlerType);
List<IRequestHandler<IRequest<TResponse>, TResponse>> list = new ArrayList<>();
for (String beanBane :
beanNamesForType) {
list.add((IRequestHandler<IRequest<TResponse>, TResponse>) this.context.getBean(beanBane));
}
if (list.isEmpty()) {
throw new NoRequestHandlerException(request.getClass());
}
return list.stream()
.findFirst()
.map(h -> h.handle(request))
.orElseThrow(() -> new NoRequestHandlerException(request.getClass()));
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.context = applicationContext;
}
}
public interface IBaseRequest {
}
多播接口
public interface INotification {
}
單播接口
public interface IRequest<TResponse> extends IBaseRequest {
}
public interface IPublisher {
<TNotification extends INotification> void publish(TNotification notification);
}
public interface ISender {
<TResponse> TResponse send(IRequest<TResponse> request);
}
public interface IMediator extends ISender, IPublisher {
}
多播處理接口
public interface INotificationHandler<TNotification extends INotification> {
void handle(TNotification notification);
}
單播處理接口
public interface IRequestHandler<TRequest extends IRequest<TResponse>, TResponse> {
TResponse handle(TRequest request);
}
public abstract class AbsRequestHandler<TRequest extends IRequest<TResponse>, TResponse>
implements IRequestHandler<TRequest, TResponse> {
@Override
public abstract TResponse handle(TRequest request);
}
public abstract class AbsNotificationHandler<TNotification extends INotification>
implements INotificationHandler<TNotification> {
@Override
public abstract void handle(TNotification notification);
}
public class Unit implements Comparable<Unit> {
public static final Unit VALUE = new Unit();
private Unit() {
}
@Override
public boolean equals(Object obj) {
return true;
}
@Override
public int hashCode() {
return 0;
}
@Override
public String toString() {
return "()";
}
@Override
public int compareTo(@NotNull Unit o) {
return 0;
}
}
public interface IUnitRequest extends IRequest<Unit> {
}
public class MediatorException extends RuntimeException {
}
@Getter
public class NoRequestHandlerException extends MediatorException {
private Class<?> requestClass;
public NoRequestHandlerException(
Class<?> requestClass
) {
this.requestClass = requestClass;
}
}
應(yīng)用場(chǎng)景
mediatr 是一種進(jìn)程內(nèi)消息傳遞機(jī)制利术,使用泛型支持消息的只能調(diào)度,其核心是 消息解耦
低矮,基于MediatR可以實(shí)現(xiàn)CQRS/EventBus等印叁。
解除構(gòu)造函數(shù)的依賴注入
public class DashboardController(
ICustomerRepository customerRepository,
IOrderService orderService,
ICustomerHistoryRepository historyRepository,
IOrderRepository orderRepository,
IProductRespoitory productRespoitory,
IRelatedProductsRepository relatedProductsRepository,
ISupportService supportService,
ILog logge
) {
}
借助 Mediator,僅需構(gòu)造注入ImediatR即可
public class DashboardController(
IMediator
) {
}
service 循環(huán)依賴军掂,使用mediatr 進(jìn)行依賴解耦轮蜕,并使用mediatr進(jìn)行消息傳遞
兩個(gè)service類和接口如下
public static interface IDemoAService {
String hello();
String helloWithB();
}
public static interface IDemoBService {
String hello();
String helloWithA();
}
public static class DemoAService implements IDemoAService {
private final IDemoBService bService;
public DemoAService(IDemoBService aService) {
this.bService = aService;
}
@Override
public String hello() {
return this.bService.helloWithA();
}
@Override
public String helloWithB() {
return "call A in B";
}
}
public static class DemoBService implements IDemoBService {
private final IDemoAService aService;
public DemoBService(IDemoAService aService) {
this.aService = aService;
}
@Override
public String hello() {
return this.aService.helloWithB();
}
@Override
public String helloWithA() {
return "call B in A";
}
}
此時(shí),如果通過(guò)構(gòu)造函數(shù)或?qū)傩宰⑷?@Autowird)蝗锥,程序在運(yùn)行時(shí)會(huì)報(bào)一下錯(cuò)誤, 提示檢測(cè)是否包括循環(huán)引用
使用 mediatr 解耦循環(huán)依賴
使用mediatr的service如下跃洛,在service構(gòu)造函數(shù)注 IMediator
,并實(shí)現(xiàn) IRequestHandler
接口
public static class DemoAService implements IDemoAService, IRequestHandler<RequestAService, String> {
//private final IDemoBService bService;
private final IMediator mediator;
public DemoAService(IMediator mediator) {
this.mediator = mediator;
}
@Override
public String hello() {
return this.mediator.send(new RequestBService());
}
@Override
public String helloWithB() {
return "call A in B";
}
@Override
public String handle(RequestAService request) {
return this.helloWithB();
}
}
public static class DemoBService implements IDemoBService, IRequestHandler<RequestBService, String> {
//private final IDemoAService aService;
private final IMediator mediator;
public DemoBService(IMediator mediator) {
this.mediator = mediator;
}
@Override
public String hello() {
return this.mediator.send(new RequestAService());
}
@Override
public String helloWithA() {
return "call B in A";
}
@Override
public String handle(RequestBService request) {
return this.helloWithA();
}
}
public static class RequestAService implements IRequest<String> {
}
public static class RequestBService implements IRequest<String> {
}
測(cè)試代碼如下
@ExtendWith(SpringExtension.class)
@Import(
value = {
Mediator.class,
ServiceCycTests.DemoAService.class,
ServiceCycTests.DemoBService.class,
}
)
public class ServiceCycTests {
@Autowired
IDemoAService aService;
@Autowired
IDemoBService bService;
@Test
public void should() {
String a = aService.hello();
assertThat(a).isEqualTo("call B in A");
String b = bService.hello();
assertThat(b).isEqualTo("call A in B");
}
}
測(cè)試結(jié)果