.NET Core 消息传递:MediatR

MediatR 是参考中介者模式实现的一个 .NET 工具类库,支持在进程内以单播或多播的形式进行消息传递,通过使用 MediatR 可实现消息的发送和处理充分解耦。

在介绍 MediatR 之前,先简单了解下中介者模式。中介者模式主要是指定义一个中介对象来调度一系列对象之间的交互关系,各对象之间不需要显式的相互引用,降低耦合性。如下对比图(普通模式与中介者模式的区别):

实际上从 MediatR 源代码中可以看出,它本身也并非标准中介者模式的实现,所以这里简单了解即可。接下来将先介绍 MediatR 的两种消息传递方式的使用方式,然后再分析其具体实现。

创建一个 .NET Core Web API 项目并安装 MediatR.Extensions.Microsoft.DependencyInjection NuGet 包(已含 MediatR NuGet 包),然后在 ConfigureServices 中注册服务。

1
2
// 扫描 Startup 所在程序集内实现了 Handler 的对象并添加到 IoC 容器中
services.AddMediatR(typeof(Startup));

可通过查看 MediatR.Extensions.Microsoft.DependencyInjection 说明了解 AddMediatR 具体包含了哪些服务的注册以及各注册对象的生命周期,基本通过以上一行代码就已经把 MediatR 相关的服务全部注册到 IoC 容器中。

单播消息传递

单播消息传递主要涉及 IRequest(消息类型) 和 IRequestHandler(消息处理) 两个接口。

定义接口 IRequest 的实现类,string 指定消息处理方法的返回值类型,如下:

1
2
3
4
public class GenericRequest : IRequest<string>
{
public string Name { get; set; }
}

定义接口 IRequestHandler 的实现类,GenericRequest 指定此 Handler 要处理的消息类型,string 指定消息处理方法的返回值类型(IRequest 指定的泛型类型一致),另外需实现 Handle 方法,如下:

1
2
3
4
5
6
7
public class GenericRequestHandler : IRequestHandler<GenericRequest, string>
{
public Task<string> Handle(GenericRequest request, CancellationToken cancellationToken)
{
return Task.FromResult($"This is {request.Name}");
}
}

在 Controller 中进行调用测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private readonly IMediator _mediator;

public MediatorController(IMediator mediator)
{
_mediator = mediator;
}

[HttpGet]
public async Task<string> GenericRequest()
{
var result = await _mediator.Send(new GenericRequest
{
Name = "GenericRequest"
});
return result;
}

另外针对不同的代码实现方式,有其他的 request-types 可选,本质上还是基于 IRequestIRequestHandler 的扩展。

多播消息传递

多播消息传递主要涉及 INotification(消息类型) 和 INotificationHandler(消息处理) 两个接口,另外多播消息传递是无返回值的。

定义接口 INotification 的实现类,如下:

1
2
3
4
public class GenericNotification : INotification
{
public string Name { get; set; }
}

定义接口 INotificationHandler 的实现类,GenericNotification 指定此 Handler 要处理的消息类型,另外需实现 Handle 方法,这里将为此消息类型定义两个 NotificationHandler 实现类,如下:

1
2
3
4
5
6
7
8
public class GenericANotificationHandler : INotificationHandler<GenericNotification>
{
public Task Handle(GenericNotification notification, CancellationToken cancellationToken)
{
Console.WriteLine($"A {notification.Name}");
return Task.CompletedTask;
}
}

1
2
3
4
5
6
7
8
public class GenericBNotificationHandler : INotificationHandler<GenericNotification>
{
public Task Handle(GenericNotification notification, CancellationToken cancellationToken)
{
Console.WriteLine($"B {notification.Name}");
return Task.CompletedTask;
}
}

在 Controller 中进行调用测试:

1
2
3
4
5
6
7
8
[HttpGet]
public async Task GenericNotification()
{
await _mediator.Publish(new GenericNotification
{
Name = "GenericNotification"
});
}

原理分析

建议阅读下源码,代码量少且结构清晰,基本理解没什么难度

通过前面的介绍可以了解在 MediatR 中面向开发者的核心接口主要是 IRequest&IRequestHandlerINotification&INotificationHandlerIMediator

如下 IMediator 的实现类 Mediator 中的定义:

1
2
3
4
5
6
public class Mediator : IMediator
{
private readonly ServiceFactory _serviceFactory;
private static readonly ConcurrentDictionary<Type, object> _requestHandlers = new ConcurrentDictionary<Type, object>();
private static readonly ConcurrentDictionary<Type, NotificationHandlerWrapper> _notificationHandlers = new ConcurrentDictionary<Type, NotificationHandlerWrapper>();
}

首先定义了 ServiceFactory 对象,它代表当前应用程序的 IoC 容器,在应用初始化阶段进行了注入,如 MediatR.Extensions.Microsoft.DependencyInjection 已包含了对应的 ServiceFactory 注册。由于 ServiceFactory 可自定义,所以开发中也完全可以选择其他的含 IoC 容器功能的框架,如 AutofacCastle WindsorDryIoc 等。

另外定义 _requestHandlers_notificationHandlers 分别保存单播和多播消息对象类型对应的 HandlerWrapper 对象,HandlerWrapper 的主要是对 ServiceFactory 对象的传递,最终通过 ServiceFactory 从 IoC 容器中获取对应消息类型的 Handler 对象。

MeidatR 还支持为单播消息定义消息处理的 Pipeline,如通过实现 IRequestPreProcessorIRequestPostProcessor 在消息处理前后自定义处理行为,通过实现 IRequestExceptionHandlerIRequestExceptionAction 在异常时自定义处理行为,这些实现类也是通过 ServiceFactory 从 IoC 容器中获取。

以下是单播消息处理的核心代码:

1
2
3
4
5
6
7
8
9
public override Task<TResponse> Handle(IRequest<TResponse> request, CancellationToken cancellationToken, ServiceFactory serviceFactory)
{
Task<TResponse> Handler() => GetHandler<IRequestHandler<TRequest, TResponse>>(serviceFactory).Handle((TRequest) request, cancellationToken);

return serviceFactory
.GetInstances<IPipelineBehavior<TRequest, TResponse>>()
.Reverse()
.Aggregate((RequestHandlerDelegate<TResponse>) Handler, (next, pipeline) => () => pipeline.Handle((TRequest)request, cancellationToken, next))();
}

首先从 ServiceFactory 获取 IPipelineBehavior,然后通 Linq 的 Reverse 方法进行顺序颠倒,最后通过 Aggregate 进行委托传递并执行,所以最终执行顺序是 RequestPreProcessorBehaviorHandlerRequestPostProcessorBehavior,这里的实现可能较难理解,核心是 Aggregate 的使用。

总结

MediatR 在实现上核心是通过保存消息请求对象与消息处理对象的关系,配合 IoC 容器实现的消息传递解耦。在实际应用中,通过 MediatR 多播消息传递可以使代码实现逻辑上更加简洁,另外也有较多的文章介绍了通过 MediatR 实现 CQRSEventBus 等。

如果对你有帮助就好