中间件需求概况
能够适配多种队列框架(如 RabbitMQ、Kafka等)。
支持通过 appsettings.json 文件配置。
支持手动强代码方式配置。
支持依赖注入和静态实例方式调用。
支持多个队列消息发布,即将消息写入到不同的队列中。
支持多个队列的消息消费,即能够订阅多个队列。
中间件核心设计
MessageProviderFactory(消息提供者工厂)
MessageProviderFactory 负责创建 MessageProvider(消息提供者),每个 MessageProvider 对应一个队列生产者和消费者,开发者在代码中可以自由的选择消息要通过 Provider 发给哪个队列。Factory 的核心代码如下:
IMessageProviderFactory.cs
/// <summary> /// /// </summary> public interface IMessageProviderFactory { /// <summary> /// /// </summary> /// <param name="providerName"></param> /// <returns></returns> IMessageProvider GetProvider(string providerName =MessageEngineConstValue.DefaultName); }
MessageProviderFactory.cs
/// <summary> /// /// </summary> internal class MessageProviderFactory : IMessageProviderFactory { /// <summary> /// 保存消息提供者 /// </summary> private readonly Dictionary<string, IMessageProvider> providers = new Dictionary<string, IMessageProvider>(); /// <summary> /// /// </summary> /// <param name="serviceProviders"></param> public MessageProviderFactory(IEnumerable<IMessageProvider> serviceProviders) { foreach (var provider in serviceProviders) { if (providers.ContainsKey(provider.ProviderName)) throw new ArgumentException($"已添加名称为【{provider.ProviderName} 】的消息提供程序。"); this.providers.Add(provider.ProviderName, provider); } } /// <summary> /// /// </summary> /// <param name="providerName"></param> /// <returns></returns> public IMessageProvider GetProvider(string providerName = MessageEngineConstValue.DefaultName) { if (string.IsNullOrWhiteSpace(providerName)) throw new ArgumentNullException("参数 providerName 不能为空。"); if (!providers.ContainsKey(providerName)) throw new KeyNotFoundException($"没有找到名称为【{providerName} 】的消息提供程序。"); return providers[providerName]; } }
MessageProvider (消息提供者)
消息提供者用于创建消息引擎(如 Rabbit、Kafka)和 启动、停止消息引擎,其核心代码如下:
IMessageProvider.cs
/// <summary> /// 消息提供者 /// </summary> public interface IMessageProvider { /// <summary> /// 当前提供程序的名称 /// </summary> string ProviderName { get;} /// <summary> /// 使用的消息引擎,例如 rabbit /// </summary> IMessageEngine Engine { get;} /// <summary> /// 启动消息服务提供程序 /// </summary> void Start(); /// <summary> /// 停止消息服务提供程序 /// </summary> void Stop(); }
DefaultMessageProvider.cs
/// <summary> /// /// </summary> public class DefaultMessageProvider : IMessageProvider { /// <summary> /// 获取当前提供程序中关联的消息引擎实例 /// </summary> public IMessageEngine Engine { get; private set; } /// <summary> /// 当前提供程序的名称 /// </summary> public string ProviderName { get; private set; } public DefaultMessageProvider(IEnumerable<IMessageEngine> engines, string providerName = MessageEngineConstValue.DefaultName) { this.ProviderName = providerName; foreach (var engine in engines) { if (engine.EngineName == this.ProviderName) { this.Engine = engine; } } this.Start(); } /// <summary> /// 启动消息引擎 /// </summary> public void Start() { this.Engine.Start(); } /// <summary> /// 停止消息引擎 /// </summary> public void Stop() { this.Engine.Stop(); } }
IMessageEngine (消息引擎)
消息引擎 负责将消息发送到相应的MQ中间件,并且维护一个单例的连接。
IMessageEngine.cs
/// <summary> /// 消息引擎接口 /// </summary> public interface IMessageEngine { /// <summary> /// 消息引擎名称 /// </summary> string EngineName { get; set; } /// <summary> /// 启动消息服务引擎 /// </summary> void Start(); /// <summary> /// 停止消息服务引擎 /// </summary> void Stop(); /// <summary> /// 通过同步方式发送消息 /// </summary> /// <param name="msg"></param> void Send(Message msg); /// <summary> /// 通过异步方式发送消息 /// </summary> /// <param name="msg"></param> Task SendAsync(Message msg); }
RabbitMessageEngine.cs
/// <summary> /// rabbit 消息引擎 /// </summary> internal class RabbitMessageEngine : IMessageEngine { /// <summary> /// /// </summary> private readonly RabbitQueueOptions options; /// <summary> /// 引擎的消息生产者服务 /// </summary> private ProducerService producerService; /// <summary> /// /// </summary> private readonly ILoggerFactory loggerFactory; /// <summary> /// /// </summary> /// <param name="config"></param> public RabbitMessageEngine(string name, RabbitOptions options, ILoggerFactory loggerFactory) { this.EngineName = name; this.options = options.MessageQueue; this.loggerFactory = loggerFactory; } /// <summary> /// 引擎名称 /// </summary> public string EngineName { get; set; } /// <summary> /// 同步发送消息 /// </summary> /// <param name="msg"></param> public void Send(Message msg) { producerService.PublishMsg<Message>(options.ExchangeName, options.RouteKey, msg); } /// <summary> /// 异步发送消息 /// </summary> /// <param name="msg"></param> public Task SendAsync(Message msg) { return Task.Factory.StartNew(() => { this.Send(msg); }); } /// <summary> /// 启动引擎 /// </summary> public void Start() { // 创建rabbitmq 连接 producerService = new ProducerService(options, loggerFactory?.CreateLogger<ProducerService>()); } /// <summary> /// 停止引擎 /// </summary> public void Stop() { } }
IMessageSink (消息消费管道)
消息消费管道,负责真实消息的消费,每个 Provider 能够配置多个消息消费管道,每个管道应该被分配为处理不同的业务。如 SMMessageSink 应该负责短信通知业务,IMMessageSink 应该负责 IM 通知业务。
IMessageSink.cs
/// <summary> /// 消息消费管道 /// </summary> public interface IMessageSink { /// <summary> /// 加载配置文件 /// </summary> /// <param name="sinkCfg"></param> void LoadFromConfig(SinkConfigSection sinkCfg); /// <summary> /// 处理消息 /// </summary> /// <param name="msg"></param> SinkResult Process(Message msg); }
Message (消息体)
Message 约定了推送到队列中消息内容的格式。
/// <summary> /// 消息基类 /// </summary> [Serializable] public class Message { /// <summary> /// /// </summary> public Message() { this.Headers = new Dictionary<string, object>(); this.UniqueId = Guid.NewGuid().ToString("N"); this.Headers.Add("$MessageVersion", System.Reflection.Assembly.GetExecutingAssembly().GetName().Version); this.Headers.Add("$CreateTime", DateTime.Now); } /// <summary> /// 消息体 /// </summary> public object Body { get; set; } /// <summary> /// 消息类型 /// </summary> public string MessageType { get; set; } /// <summary> /// 消息唯一编号,实例化时会默认赋 GUID 值 /// </summary> public string UniqueId { get; set; } /// <summary> /// 消息头集合 /// </summary> public Dictionary<string, object> Headers { get; set; } /// <summary> /// 创建一个消息实例 /// </summary> /// <returns></returns> public static Message CreateMessage() { return new Message() ; } /// <summary> /// 创建一个消息实例 /// </summary> /// <returns></returns> public static Message CreateMessage(string messageType) { return new Message() { MessageType = messageType }; } /// <summary> /// 创建一个消息实例 /// </summary> /// <returns></returns> public static Message CreateMessage(string messageType, object body) { return new Message() { MessageType = messageType, Body = body }; } }
RabbitListenService (队列监听后台服务)
RabbitListenService 继承 IHostedService,负责维护队列的监听,实时对消息进行消费和分发。
/// <summary> /// rabbit 消息监听者,负责监听队列,消费消息 /// </summary> internal class RabbitListenService : IHostedService { /// <summary> /// 配置文件 /// </summary> private readonly List<RabbitOptions> options; /// <summary> /// 保存消息消费者对象服务 /// </summary> private readonly List<ConsumerService> consumerServices; /// <summary> /// The logger /// </summary> private readonly ILogger logger; /// <summary> /// /// </summary> private readonly ILoggerFactory loggerFactory; /// <summary> /// /// </summary> /// <param name="options"></param> public RabbitListenService(List<RabbitOptions> options, ILoggerFactory loggerFactory) { this.options = options; this.loggerFactory = loggerFactory; logger = loggerFactory?.CreateLogger<RabbitListenService>(); consumerServices = new List<ConsumerService>(); } /// <summary> /// 获取注册的消息消费管道 /// </summary> /// <param name="channelCfg"></param> private Dictionary<string, List<IMessageSink>> GetChannelSinks(List<ChannelConfigSection> channelCfg) { var messageChannels = new Dictionary<string, List<IMessageSink>>(); foreach (var channel in channelCfg) { var sinks = new List<IMessageSink>(); foreach (var sinkCfg in channel.Sinks) { Type t = Type.GetType(sinkCfg.SinkType); IMessageSink sink = (IMessageSink)Activator.CreateInstance(t); sink.LoadFromConfig(sinkCfg); sinks.Add(sink); logger?.LogTrace($"注册消息类型为【{channel.MessageType}】的消息处理管道【{sinkCfg.SinkType}】"); } messageChannels.Add(channel.MessageType, sinks); } return messageChannels; } /// <summary> /// 分发消息 /// </summary> /// <returns></returns> private SinkResult Dispatcher(string content, Dictionary<string, List<IMessageSink>> messageChannels) { Message msg; try { var jsonSetting = new JsonSerializerSettings { ReferenceLoopHandling = ReferenceLoopHandling.Ignore }; msg = JsonConvert.DeserializeObject<Message>(content, jsonSetting); logger?.LogTrace($"消息内容反序列为 Message 对象成功,消息ID为:【{msg.UniqueId}】"); } catch (Exception ex) { logger?.LogError(ex, "消息内容反序列化出现异常"); return new SinkResult() { IsReQueue = false, IsSuccess = false, ErrorMessage = "消息内容反序列化出现异常" }; } foreach (var channel in messageChannels) { logger?.LogTrace($"消息ID为:【{msg.UniqueId} 】的消息类型为【{msg.MessageType}】"); if (channel.Key != msg.MessageType) continue; foreach (var sink in channel.Value) { try { logger?.LogTrace($"消息ID为:【{msg.UniqueId} 】的消息开始执行处理管道 【{sink.GetType()}】"); var result = sink.Process(msg); if (!result.IsSuccess) { logger?.LogWarning($"消息ID为:【{msg.UniqueId} 】消息在管道【{sink.GetType()}】处理时失败"); return result; } logger?.LogInformation($"消息ID为:【{msg.UniqueId} 】消息被管道【{sink.GetType()}】正确执行"); } catch (Exception ex) { logger?.LogError(ex, $"消息ID为:【{msg.UniqueId} 】消息在管道 【{sink.GetType()}】处理时出现异常"); return new SinkResult() { IsReQueue = false, IsSuccess = false, ErrorMessage = $"消息管道{sink.GetType().ToString()}处理消息出现异常" }; } } } return new SinkResult() { IsSuccess = true }; } /// <summary> /// 启动后台服务 /// </summary> /// <param name="cancellationToken"></param> /// <returns></returns> public Task StartAsync(CancellationToken cancellationToken) { try { foreach (var option in options) { var messageChannels = GetChannelSinks(option.Channels); logger?.LogTrace($"开始初始化消息提供者【{option.ProviderName}】的队列监听服务"); // 创建rabbitmq 连接 var consumerService = new ConsumerService(option.MessageQueue, loggerFactory?.CreateLogger<ConsumerService>()); consumerService.ReceiveMessage = Dispatcher; consumerService.ListenMessage(option.MessageQueue.QueueName, messageChannels); logger?.LogTrace($"完成初始化消息提供者【{option.ProviderName}】的队列监听服务"); consumerServices.Add(consumerService); } } catch (Exception ex) { logger?.LogError(ex, "启动消息引擎队列监听服务时出现异常"); } return Task.CompletedTask; } /// <summary> /// 停止后台服务 /// </summary> /// <param name="cancellationToken"></param> /// <returns></returns> public Task StopAsync(CancellationToken cancellationToken) { foreach (var item in consumerServices) { item.CloseConnect(); } consumerServices.Clear(); return Task.CompletedTask; } }
MessageEngineServiceCollectionExtensions (服务注入)
/// <summary> /// 消息引擎 /// </summary> public static class MessageEngineServiceCollectionExtensions { /// <summary> /// 通过配置文件方式注册服务 /// </summary> /// <param name="services"></param> /// <param name="configuration"></param> /// <param name="registerAction"></param> /// <returns></returns> public static IServiceCollection AddMessageEngine(this IServiceCollection services, IConfiguration configuration , Action<MessageEngineOptions, EngineType, IConfiguration, string> registerAction) { services.AddSingleton<IMessageProviderFactory, MessageProviderFactory>(); var options = new MessageEngineOptions(); var providersSection = configuration.GetSection("Providers").GetChildren().ToList(); #region 注册消息引擎服务 foreach (var providerSection in providersSection) { // 使用的消息引擎类型 var engineType = providerSection.GetValue<string>("EngineType"); // 提供程序名称 var providerName = providerSection.GetValue<string>("ProviderName"); if (string.IsNullOrWhiteSpace(engineType)) { throw new ArgumentException("消息引擎类型【EngineType】属性必须配置,其值为:Rabbit 。"); } if (string.IsNullOrWhiteSpace(providerName)) { throw new ArgumentException("消息引擎提供者名称【ProviderName】属性必须配置,并且该名称是唯一的。"); } if (Enum.TryParse(engineType, true, out EngineType engineEnum)) { registerAction(options, engineEnum, providerSection, providerName); } else { throw new ArgumentException("消息引擎类型【EngineType】属性必须配置,其值为:Rabbit 。"); } } foreach (var serviceExtension in options.Extensions) { serviceExtension.AddServices(services); } services.AddSingleton(options); #endregion return services; } /// <summary> /// 通过手动方式注册服务 /// </summary> /// <param name="services"></param> /// <param name="setupAction"></param> /// <returns></returns> public static IServiceCollection AddMessageEngine(this IServiceCollection services, Action<MessageEngineOptions> setupAction) { services.AddSingleton<IMessageProviderFactory, MessageProviderFactory>(); var options = new MessageEngineOptions(); setupAction(options); foreach (var serviceExtension in options.Extensions) { serviceExtension.AddServices(services); } services.AddSingleton(options); return services; } }
通过配置文件配置
appsettings.json
"Message": { "Providers": [ { "ProviderName": "Default", "EngineType": "Rabbit", "MessageQueue": { "HostName": "127.0.0.1", "Port": 5672, "UserName": "guest", "Password": "guest", "ExchangeName": "flowtest", "QueueName": "flowtest", "RouteKey": "flowtest", "Properties": [ ] }, "Channels": [ { "MessageType": "IM", "Sinks": [ { "SinkType": "MessageEngines.Test.DefaultMessageSink,MessageEngines.Test" } ] } ] }, { "ProviderName": "SM", "EngineType": "Rabbit", "MessageQueue": { "hostName": "127.0.0.1", "port": 5672, "userName": "guest", "password": "guest", "exchangeName": "flowtest2", "queueName": "flowtest2", "routeKey": "flowtest2", "Properties": [ ] }, "Channels": [ { "MessageType": "SM", "Sinks": [ { "SinkType": "MessageEngines.Test.SMMessageSink,MessageEngines.Test" } ] } ] } ] }
startup.cs
services.AddMessageEngine(Configuration.GetSection("Message"), (o, t, c, p)=> { if (t == EngineType.Rabbit) { o.UseRabbit(c, p); } });
代码方式配置
startup.cs
services.AddMessageEngine((o) => { o.UseRabbit((r) => { r.ProviderName = "Default"; r.MessageQueue.HostName = "127.0.0.1"; r.MessageQueue.Port = 5672; r.MessageQueue.UserName = "guest"; r.MessageQueue.Password= "guest"; r.MessageQueue.QueueName = "flowtest"; r.MessageQueue.ExchangeName = "flowtest"; r.MessageQueue.RouteKey = "flowtest"; r.Channels = new System.Collections.Generic.List<Core.Config.ChannelConfigSection>() { new Core.Config.ChannelConfigSection(){ MessageType = "IM", Sinks=new System.Collections.Generic.List<Core.Config.SinkConfigSection>(){ new Core.Config.SinkConfigSection() { SinkType= "MessageEngines.Test.DefaultMessageSink,MessageEngines.Test" } } } }; }, "Default"); o.UseRabbit((r) => { r.ProviderName = "SM"; r.MessageQueue.HostName = "127.0.0.1"; r.MessageQueue.HostName = "127.0.0.1"; r.MessageQueue.Port = 5672; r.MessageQueue.UserName = "guest"; r.MessageQueue.Password = "guest"; r.MessageQueue.QueueName = "flowtest2"; r.MessageQueue.ExchangeName = "flowtest2"; r.MessageQueue.RouteKey = "flowtest2"; r.Channels = new System.Collections.Generic.List<Core.Config.ChannelConfigSection>() { new Core.Config.ChannelConfigSection(){ MessageType = "SM", Sinks=new System.Collections.Generic.List<Core.Config.SinkConfigSection>(){ new Core.Config.SinkConfigSection() { SinkType= "MessageEngines.Test.SMMessageSink,MessageEngines.Test" } } } }; }, "SM"); });
写在最后
由于篇幅问题,很多代码无法贴出,望笔友理解。更详细的源码请留言或@我。
评论列表
评论内容: