夫天地者,万物之逆旅;光阴者,百代之过客。而浮生若梦,为欢几何?
如何封装一个适用于 .Net Core 的通用型消息队列组件

中间件需求概况

  1. 能够适配多种队列框架(如 RabbitMQ、Kafka等)。  

  2. 支持通过 appsettings.json 文件配置。  

  3. 支持手动强代码方式配置。  

  4. 支持依赖注入和静态实例方式调用。  

  5. 支持多个队列消息发布,即将消息写入到不同的队列中。  

  6. 支持多个队列的消息消费,即能够订阅多个队列。

中间件核心设计

MessageProviderFactory(消息提供者工厂)

MessageProviderFactory 负责创建 MessageProvider(消息提供者),每个 MessageProvider 对应一个队列生产者和消费者,开发者在代码中可以自由的选择消息要通过 Provider 发给哪个队列。Factory 的核心代码如下:


IMessageProviderFactory.cs

/// <summary>
    /// 
    /// </summary>
    public interface IMessageProviderFactory
    {
        /// <summary>
        /// 
        /// </summary>
        /// <param name="providerName"></param>
        /// <returns></returns>
        IMessageProvider GetProvider(string providerName =MessageEngineConstValue.DefaultName);
    }

MessageProviderFactory.cs

/// <summary>
    /// 
    /// </summary>
    internal class MessageProviderFactory : IMessageProviderFactory
    {
        /// <summary>
        /// 保存消息提供者
        /// </summary>
        private readonly Dictionary<string, IMessageProvider> providers = new Dictionary<string, IMessageProvider>();

        /// <summary>
        /// 
        /// </summary>
        /// <param name="serviceProviders"></param>
        public MessageProviderFactory(IEnumerable<IMessageProvider> serviceProviders)
        {
            foreach (var provider in serviceProviders)
            {
                if (providers.ContainsKey(provider.ProviderName))
                    throw new ArgumentException($"已添加名称为【{provider.ProviderName} 】的消息提供程序。");

                this.providers.Add(provider.ProviderName, provider);
            }
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="providerName"></param>
        /// <returns></returns>
        public IMessageProvider GetProvider(string providerName = MessageEngineConstValue.DefaultName)
        {
            if (string.IsNullOrWhiteSpace(providerName))
                throw new ArgumentNullException("参数 providerName 不能为空。");
            if (!providers.ContainsKey(providerName))
                throw new KeyNotFoundException($"没有找到名称为【{providerName} 】的消息提供程序。");

            return providers[providerName];
        }
    }

MessageProvider (消息提供者)

消息提供者用于创建消息引擎(如 Rabbit、Kafka)和 启动、停止消息引擎,其核心代码如下:

IMessageProvider.cs

/// <summary>
    /// 消息提供者
    /// </summary>
    public interface IMessageProvider
    {
        /// <summary>
        /// 当前提供程序的名称
        /// </summary>
        string ProviderName { get;}

        /// <summary>
        /// 使用的消息引擎,例如 rabbit
        /// </summary>
        IMessageEngine Engine { get;}

        /// <summary>
        /// 启动消息服务提供程序
        /// </summary>
        void Start();

        /// <summary>
        /// 停止消息服务提供程序
        /// </summary>
        void Stop();
    }

DefaultMessageProvider.cs

/// <summary>
    /// 
    /// </summary>
    public class DefaultMessageProvider : IMessageProvider
    {
        /// <summary>
        /// 获取当前提供程序中关联的消息引擎实例
        /// </summary>
        public IMessageEngine Engine { get; private set; }

        /// <summary>
        /// 当前提供程序的名称
        /// </summary>
        public string ProviderName { get; private set; }

        public DefaultMessageProvider(IEnumerable<IMessageEngine> engines, string providerName = MessageEngineConstValue.DefaultName)
        {
            this.ProviderName = providerName;

            foreach (var engine in engines)
            {
                if (engine.EngineName == this.ProviderName)
                {
                    this.Engine = engine;
                }
            }

            this.Start();
        }

        /// <summary>
        /// 启动消息引擎
        /// </summary>
        public void Start()
        {
            this.Engine.Start();
        }

        /// <summary>
        /// 停止消息引擎
        /// </summary>
        public void Stop()
        {
            this.Engine.Stop();
        }
    }

IMessageEngine (消息引擎)

消息引擎 负责将消息发送到相应的MQ中间件,并且维护一个单例的连接。

IMessageEngine.cs

/// <summary>
    /// 消息引擎接口
    /// </summary>
    public interface IMessageEngine
    {
        /// <summary>
        /// 消息引擎名称
        /// </summary>
        string EngineName { get; set; }

        /// <summary>
        /// 启动消息服务引擎
        /// </summary>
        void Start();

        /// <summary>
        /// 停止消息服务引擎
        /// </summary>
        void Stop();

        /// <summary>
        /// 通过同步方式发送消息
        /// </summary>
        /// <param name="msg"></param>
        void Send(Message msg);

        /// <summary>
        /// 通过异步方式发送消息
        /// </summary>
        /// <param name="msg"></param>
        Task SendAsync(Message msg);
    }

RabbitMessageEngine.cs

/// <summary>
    /// rabbit 消息引擎
    /// </summary>
    internal class RabbitMessageEngine : IMessageEngine
    {
        /// <summary>
        /// 
        /// </summary>
        private readonly RabbitQueueOptions options;

        /// <summary>
        /// 引擎的消息生产者服务
        /// </summary>
        private ProducerService producerService;

        /// <summary>
        /// 
        /// </summary>
        private readonly ILoggerFactory loggerFactory;

        /// <summary>
        /// 
        /// </summary>
        /// <param name="config"></param>
        public RabbitMessageEngine(string name, RabbitOptions  options, ILoggerFactory loggerFactory)
        {
            this.EngineName = name;
            this.options = options.MessageQueue;
            this.loggerFactory = loggerFactory;
        }

        /// <summary>
        /// 引擎名称
        /// </summary>
        public string EngineName { get; set; }

        /// <summary>
        /// 同步发送消息
        /// </summary>
        /// <param name="msg"></param>
        public void Send(Message msg)
        {
            producerService.PublishMsg<Message>(options.ExchangeName, options.RouteKey, msg);
        }

        /// <summary>
        /// 异步发送消息
        /// </summary>
        /// <param name="msg"></param>
        public Task SendAsync(Message msg)
        {
           return Task.Factory.StartNew(() =>
            {
                this.Send(msg);
            });
        }

        /// <summary>
        /// 启动引擎
        /// </summary>
        public void Start()
        {
            // 创建rabbitmq 连接
            producerService = new ProducerService(options, loggerFactory?.CreateLogger<ProducerService>());
        }

        /// <summary>
        /// 停止引擎
        /// </summary>
        public void Stop()
        {
        }
    }

IMessageSink (消息消费管道)

消息消费管道,负责真实消息的消费,每个 Provider 能够配置多个消息消费管道,每个管道应该被分配为处理不同的业务。如 SMMessageSink 应该负责短信通知业务,IMMessageSink 应该负责 IM 通知业务。

IMessageSink.cs

/// <summary>
    /// 消息消费管道
    /// </summary>
    public interface IMessageSink
    {
        /// <summary>
        /// 加载配置文件
        /// </summary>
        /// <param name="sinkCfg"></param>
        void LoadFromConfig(SinkConfigSection sinkCfg);

        /// <summary>
        /// 处理消息
        /// </summary>
        /// <param name="msg"></param>
        SinkResult Process(Message msg);
    }

Message (消息体)

Message 约定了推送到队列中消息内容的格式。

/// <summary>
    /// 消息基类
    /// </summary>
    [Serializable]
    public class Message
    {
        /// <summary>
        /// 
        /// </summary>
        public Message()
        {
            this.Headers = new Dictionary<string, object>();
            this.UniqueId = Guid.NewGuid().ToString("N");
            this.Headers.Add("$MessageVersion", System.Reflection.Assembly.GetExecutingAssembly().GetName().Version);
            this.Headers.Add("$CreateTime", DateTime.Now);
        }

        /// <summary>
        /// 消息体
        /// </summary>
        public object Body { get; set; }

        /// <summary>
        /// 消息类型
        /// </summary>
        public string MessageType { get; set; }

        /// <summary>
        /// 消息唯一编号,实例化时会默认赋 GUID 值
        /// </summary>
        public string UniqueId { get; set; }

        /// <summary>
        /// 消息头集合
        /// </summary>
        public Dictionary<string, object> Headers { get; set; }

        /// <summary>
        /// 创建一个消息实例
        /// </summary>
        /// <returns></returns>
        public static Message CreateMessage()
        {
            return new Message() ;
        }

        /// <summary>
        /// 创建一个消息实例
        /// </summary>
        /// <returns></returns>
        public static Message CreateMessage(string messageType)
        {
            return new Message() { MessageType = messageType };
        }

        /// <summary>
        /// 创建一个消息实例
        /// </summary>
        /// <returns></returns>
        public static Message CreateMessage(string messageType, object body)
        {
            return new Message() { MessageType = messageType, Body = body };
        }
    }

RabbitListenService (队列监听后台服务)

RabbitListenService 继承 IHostedService,负责维护队列的监听,实时对消息进行消费和分发。

/// <summary>
    /// rabbit 消息监听者,负责监听队列,消费消息
    /// </summary>
    internal class RabbitListenService : IHostedService
    {
        /// <summary>
        /// 配置文件
        /// </summary>
        private readonly List<RabbitOptions> options;

        /// <summary>
        /// 保存消息消费者对象服务
        /// </summary>
        private readonly List<ConsumerService> consumerServices;

        /// <summary>
        /// The logger
        /// </summary>
        private readonly ILogger logger;

        /// <summary>
        /// 
        /// </summary>
        private readonly ILoggerFactory loggerFactory;

        /// <summary>
        /// 
        /// </summary>
        /// <param name="options"></param>
        public RabbitListenService(List<RabbitOptions> options, ILoggerFactory loggerFactory)
        {
            this.options = options;
            this.loggerFactory = loggerFactory;
            logger = loggerFactory?.CreateLogger<RabbitListenService>();
            consumerServices = new List<ConsumerService>();
        }

        /// <summary>
        /// 获取注册的消息消费管道
        /// </summary>
        /// <param name="channelCfg"></param>
        private Dictionary<string, List<IMessageSink>> GetChannelSinks(List<ChannelConfigSection> channelCfg)
        {
            var messageChannels = new Dictionary<string, List<IMessageSink>>();

            foreach (var channel in channelCfg)
            {
                var sinks = new List<IMessageSink>();

                foreach (var sinkCfg in channel.Sinks)
                {
                    Type t = Type.GetType(sinkCfg.SinkType);
                    IMessageSink sink = (IMessageSink)Activator.CreateInstance(t);
                    sink.LoadFromConfig(sinkCfg);
                    sinks.Add(sink);
                    logger?.LogTrace($"注册消息类型为【{channel.MessageType}】的消息处理管道【{sinkCfg.SinkType}】");
                }
                messageChannels.Add(channel.MessageType, sinks);
            }

            return messageChannels;
        }

        /// <summary>
        /// 分发消息
        /// </summary>
        /// <returns></returns>
        private SinkResult Dispatcher(string content, Dictionary<string, List<IMessageSink>> messageChannels)
        {
            Message msg;
            try
            {
                var jsonSetting = new JsonSerializerSettings
                {
                    ReferenceLoopHandling = ReferenceLoopHandling.Ignore
                };
                msg = JsonConvert.DeserializeObject<Message>(content, jsonSetting);
                logger?.LogTrace($"消息内容反序列为 Message 对象成功,消息ID为:【{msg.UniqueId}】");
            }
            catch (Exception ex)
            {
                logger?.LogError(ex, "消息内容反序列化出现异常");

                return new SinkResult() { IsReQueue = false, IsSuccess = false, ErrorMessage = "消息内容反序列化出现异常" };
            }

            foreach (var channel in messageChannels)
            {
                logger?.LogTrace($"消息ID为:【{msg.UniqueId} 】的消息类型为【{msg.MessageType}】");

                if (channel.Key != msg.MessageType) continue;
                foreach (var sink in channel.Value)
                {
                    try
                    {
                        logger?.LogTrace($"消息ID为:【{msg.UniqueId} 】的消息开始执行处理管道 【{sink.GetType()}】");

                        var result = sink.Process(msg);

                        if (!result.IsSuccess)
                        {
                            logger?.LogWarning($"消息ID为:【{msg.UniqueId} 】消息在管道【{sink.GetType()}】处理时失败");
                            return result;
                        }

                        logger?.LogInformation($"消息ID为:【{msg.UniqueId} 】消息被管道【{sink.GetType()}】正确执行");
                    }
                    catch (Exception ex)
                    {
                        logger?.LogError(ex, $"消息ID为:【{msg.UniqueId} 】消息在管道 【{sink.GetType()}】处理时出现异常");

                        return new SinkResult() { IsReQueue = false, IsSuccess = false, ErrorMessage = $"消息管道{sink.GetType().ToString()}处理消息出现异常" };
                    }
                }
            }
            return new SinkResult() { IsSuccess = true };
        }

        /// <summary>
        /// 启动后台服务
        /// </summary>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public Task StartAsync(CancellationToken cancellationToken)
        {
            try
            {
                foreach (var option in options)
                {
                    var messageChannels = GetChannelSinks(option.Channels);

                    logger?.LogTrace($"开始初始化消息提供者【{option.ProviderName}】的队列监听服务");

                    // 创建rabbitmq 连接
                    var consumerService = new ConsumerService(option.MessageQueue, loggerFactory?.CreateLogger<ConsumerService>());

                    consumerService.ReceiveMessage = Dispatcher;

                    consumerService.ListenMessage(option.MessageQueue.QueueName, messageChannels);

                    logger?.LogTrace($"完成初始化消息提供者【{option.ProviderName}】的队列监听服务");

                    consumerServices.Add(consumerService);
                }
            }
            catch (Exception ex)
            {
                logger?.LogError(ex, "启动消息引擎队列监听服务时出现异常");
            }

            return Task.CompletedTask;
        }

        /// <summary>
        /// 停止后台服务
        /// </summary>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public Task StopAsync(CancellationToken cancellationToken)
        {
            foreach (var item in consumerServices)
            {
                item.CloseConnect();
            }
            consumerServices.Clear();
            return Task.CompletedTask;
        }
    }

MessageEngineServiceCollectionExtensions (服务注入)

/// <summary>
    /// 消息引擎
    /// </summary>
    public static class MessageEngineServiceCollectionExtensions
    {
        /// <summary>
        /// 通过配置文件方式注册服务
        /// </summary>
        /// <param name="services"></param>
        /// <param name="configuration"></param>
        /// <param name="registerAction"></param>
        /// <returns></returns>
        public static IServiceCollection AddMessageEngine(this IServiceCollection services, IConfiguration configuration
            , Action<MessageEngineOptions, EngineType, IConfiguration, string> registerAction)
        {
            services.AddSingleton<IMessageProviderFactory, MessageProviderFactory>();

            var options = new MessageEngineOptions();

            var providersSection = configuration.GetSection("Providers").GetChildren().ToList();

            #region 注册消息引擎服务

            foreach (var providerSection in providersSection)
            {
                // 使用的消息引擎类型
                var engineType = providerSection.GetValue<string>("EngineType");
                // 提供程序名称
                var providerName = providerSection.GetValue<string>("ProviderName");

                if (string.IsNullOrWhiteSpace(engineType))
                {
                    throw new ArgumentException("消息引擎类型【EngineType】属性必须配置,其值为:Rabbit 。");
                }
                if (string.IsNullOrWhiteSpace(providerName))
                {
                    throw new ArgumentException("消息引擎提供者名称【ProviderName】属性必须配置,并且该名称是唯一的。");
                }

                if (Enum.TryParse(engineType, true, out EngineType engineEnum))
                {
                    registerAction(options, engineEnum, providerSection, providerName);
                }
                else
                {
                    throw new ArgumentException("消息引擎类型【EngineType】属性必须配置,其值为:Rabbit 。");
                }
            }

            foreach (var serviceExtension in options.Extensions)
            {
                serviceExtension.AddServices(services);
            }
            services.AddSingleton(options);

            #endregion

            return services;
        }

        /// <summary>
        /// 通过手动方式注册服务
        /// </summary>
        /// <param name="services"></param>
        /// <param name="setupAction"></param>
        /// <returns></returns>
        public static IServiceCollection AddMessageEngine(this IServiceCollection services, Action<MessageEngineOptions> setupAction)
        {
            services.AddSingleton<IMessageProviderFactory, MessageProviderFactory>();

            var options = new MessageEngineOptions();

            setupAction(options);

            foreach (var serviceExtension in options.Extensions)
            {
                serviceExtension.AddServices(services);
            }

            services.AddSingleton(options);

            return services;
        }


    }

通过配置文件配置

appsettings.json

"Message": {
    "Providers": [
      {
        "ProviderName": "Default",
        "EngineType": "Rabbit",
        "MessageQueue": {
          "HostName": "127.0.0.1",
          "Port": 5672,
          "UserName": "guest",
          "Password": "guest",
          "ExchangeName": "flowtest",
          "QueueName": "flowtest",
          "RouteKey": "flowtest",
          "Properties": [
          ]
        },
        "Channels": [
          {
            "MessageType": "IM",
            "Sinks": [
              {
                "SinkType": "MessageEngines.Test.DefaultMessageSink,MessageEngines.Test"
              }
            ]
          }
        ]
      },
      {
        "ProviderName": "SM",
        "EngineType": "Rabbit",
        "MessageQueue": {
          "hostName": "127.0.0.1",
          "port": 5672,
          "userName": "guest",
          "password": "guest",
          "exchangeName": "flowtest2",
          "queueName": "flowtest2",
          "routeKey": "flowtest2",
          "Properties": [
          ]
        },
        "Channels": [
          {
            "MessageType": "SM",
            "Sinks": [
              {
                "SinkType": "MessageEngines.Test.SMMessageSink,MessageEngines.Test"
              }
            ]
          }
        ]
      }
    ]
  }

startup.cs

            services.AddMessageEngine(Configuration.GetSection("Message"), (o, t, c, p)=> { 
                if (t == EngineType.Rabbit)
                {
                    o.UseRabbit(c, p);
                }
            });

代码方式配置

startup.cs

services.AddMessageEngine((o) =>
            {
                o.UseRabbit((r) =>
                {
                    r.ProviderName = "Default";
                    r.MessageQueue.HostName = "127.0.0.1";
                    r.MessageQueue.Port = 5672;
                    r.MessageQueue.UserName = "guest";
                    r.MessageQueue.Password= "guest";
                    r.MessageQueue.QueueName = "flowtest";
                    r.MessageQueue.ExchangeName = "flowtest";
                    r.MessageQueue.RouteKey = "flowtest";
                    r.Channels = new System.Collections.Generic.List<Core.Config.ChannelConfigSection>() {
                        new Core.Config.ChannelConfigSection(){ 
                            MessageType = "IM", 
                            Sinks=new System.Collections.Generic.List<Core.Config.SinkConfigSection>(){ new Core.Config.SinkConfigSection() {  SinkType= "MessageEngines.Test.DefaultMessageSink,MessageEngines.Test" } } 
                        }
                    };
                }, "Default");

                o.UseRabbit((r) =>
                {
                    r.ProviderName = "SM";
                    r.MessageQueue.HostName = "127.0.0.1";
                    r.MessageQueue.HostName = "127.0.0.1";
                    r.MessageQueue.Port = 5672;
                    r.MessageQueue.UserName = "guest";
                    r.MessageQueue.Password = "guest";
                    r.MessageQueue.QueueName = "flowtest2";
                    r.MessageQueue.ExchangeName = "flowtest2";
                    r.MessageQueue.RouteKey = "flowtest2";
                    r.Channels = new System.Collections.Generic.List<Core.Config.ChannelConfigSection>() {
                        new Core.Config.ChannelConfigSection(){
                            MessageType = "SM",
                            Sinks=new System.Collections.Generic.List<Core.Config.SinkConfigSection>(){ new Core.Config.SinkConfigSection() {  SinkType= "MessageEngines.Test.SMMessageSink,MessageEngines.Test" } }
                        }
                    };
                }, "SM");
            });

写在最后

由于篇幅问题,很多代码无法贴出,望笔友理解。更详细的源码请留言或@我。

作者:暗夜余晖

本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

0

支持

0

反对

posted @2020-7-15  拜读(413)

评论列表

#4楼 2020-9-27 27.188.223.188
感谢分享,请问博主WEB前端如何接收消息,使用stomp.js插件吗?麻烦发下源码,谢谢。409877518@qq.com
#3楼 2020-7-28 218.241.201.162
14720751@qq.com 麻烦给下源代码
#2楼 2020-7-25 124.193.98.2
把你邮箱贴出来,发你邮箱。
#1楼 2020-7-14 49.74.64.27
源码~~~~ 楼主

评论内容:



喜欢请打赏

支付宝 微信

请放心支付