夫天地者,万物之逆旅;光阴者,百代之过客。而浮生若梦,为欢几何?
.Net Core 项目实战:我是如何定义自己的日志记录组件(上篇)

日志记录组件需求

如今微服务架构越来越受青睐,各大社区、论坛每天都有大牛在侃侃而谈。笔者所在公司也在这条路上摸黑前进,新项目已全部改用.Net Core 开发,用到的技术栈为:

.Net Core + docker + ELK + RabbitMQ + Redis + Ocelot + Consul + Nginx + MySql

为了将微服务中每个零散服务的日志进行集中记录和管理,架构小组采用将日志消息推送到消息队列框架RabbitMQ中,再由 ELK 的 Logstash 从RabbitMQ 收集日志消息到 Elasticsearch,通过 Kibana 进行展示。所以笔者所做的工作是开发一个将程序中的日志(错误/调式/埋点/跟踪)推送到 RabbitMQ 中的日志记录组件。

.Net Core 日志三大接口

1. ILoggerFactory 日志记录工厂,负责遍历注册的LoggerProvider
2. ILoggerProvider 日志记录提供器,负责创建ILogger
3. ILogger 日志记录,真正处理日志的地方,,负责干活  

本篇笔者不介绍.Net Core日志记录的基本使用,假设你已有此基础,我们直接讲如何定义自己的日志记录组件。

新建RabbitLoggerOptions强类型选项

日志组件除了需要支持Core本身功能外还要增加一些额外功能,这些个性化设置都需要通过配置文件(appsetting.json)提供,所以选择Core的强类型选项作为支撑。

// RabbitLoggerOptions 定义如下

/// <summary>
    /// 
    /// </summary>
    public class RabbitLoggerOptions
    {
        /// <summary>
        /// 是否使用作用域日志
        /// </summary>
        public bool IncludeScopes { get; set; }
        /// <summary>
        /// 日志最小级别
        /// </summary>
        public LogLevel MinLevel { get; set; }
        /// <summary>
        /// 启用异步记录
        /// </summary>
        public bool EnableAsync { get; set; }
        /// <summary>
        /// 应用程序名称,可以自定义,会携带到消息记录中
        /// </summary>
        public string ApplicationName { get; set; }
    }

新建ILoggerPersistence和其实现RabbitLoggerPersistence

日志组件要求将日志输出到RabbitMQ中,但是考虑到日后持久化对象可能发生变化,所以我定义了一个ILoggerPersistence接口,该接口提供 Write、WriteAsync两个方法。

/// <summary>
    /// 定义 日志持久化 输出接口
    /// </summary>
    public interface ILoggerPersistence
    {
        /// <summary>
        /// 同步方式写入日志
        /// </summary>
        /// <param name="message"></param>
        void Write(LoggerMetadata message);
        /// <summary>
        /// 异步方式写入日志
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        Task WriteAsync(LoggerMetadata message);
    }

RabbitLoggerPersistence接口实现类主要的作用是将日志格式化Push到RabbitMQ。笔者使用的Rabbit驱动为官方SDK,该SDK内部已经实现了自动重连,帮我们省去了很多工作。

在该类的构造函数中我们接收一个具有IOptionsMonitor<T>泛型类型的RabbitConnectOptions强类型选项,它将提供RabbitMQ连接字符等配置。笔友可能对IOptionsMonitor不胜了解,不过这些都不重要,使用它的原因是当appsetting.json 中的配置变更后可以利用IOptionsMonitor<T>的 OnChange事件获取到这些变更内容,然后应用到运行中的程序,不必对程序进行重启。说到这里是不是迫切的想一睹RabbitConnectOptions的定义呢?

 /// <summary>
    /// rabbitmq 数据连接配置
    /// </summary>
    public class RabbitConnectOptions
    {
        /// <summary>
        /// IP 地址
        /// </summary>
        public string HostName { get; set; }

        /// <summary>
        /// 端口号
        /// </summary>
        public int Port { get; set; }

        /// <summary>
        /// 用户账号
        /// </summary>
        public string UserName { get; set; }

        /// <summary>
        /// 用户密码
        /// </summary>
        public string Password { get; set; }

        /// <summary>
        /// 路由名称
        /// </summary>
        public string ExChange { get; set; }

        /// <summary>
        /// 路由
        /// </summary>
        public string RoutingKey { get; set; }

        /// <summary>
        /// 队列名称
        /// </summary>
        public string Queue { get; set; }

        /// <summary>
        /// 是否持久化
        /// </summary>
        public bool IsDurable { get; set; }
        /// <summary>
        /// 命名空间
        /// </summary>
        public string VirtualHost { get; set; }

        /// <summary>
        /// 是否启用缓存区, 开启后 会使用一个总线程负责消息推送,这样可以节约 线程的开销
        /// </summary>
        public bool EnableBuffer { get; set; }

定义好RabbitConnectOptions后回头再看RabbitLoggerPersistence是如何实现的。

在RabbitLoggerPersistence中需要对Write、WriteAsync这两个方法进行具体实现,从名字可以看出前者利用同步方式将消息Push到Rabbit,后者使用异步方式,这里我们只讲异步的实现思路。

Core中涉及到异步编程的知识,笔友立即会想到Task,没错,笔者也是该类进行的核心实现。但是大型web应用中,每次记录一条日志都会开启一个Task,在万级/百万级访问量的情况下对性能和服务器的压力将是巨大的考验。于是笔者决定采用类似缓冲池的概念去解决此问题。

使用微软提供的 BlockingCollection<T> 类可以实现类似队列的功能,在WriteAsync方法中我们将日记消息对象Add到BlockingCollection,在RabbitLoggerPersistence构造函数中开启一个Task,该Task将一直监听 BlockingCollection,如果从 BlockingCollection 获取到消息就调用Write方法,由Write将消息Push到Rabbit。下面是核心代码:

/// <summary>
        /// 
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        public Task WriteAsync(LoggerMetadata message)
        {
            if (_RabbitConnectOptions.CurrentValue.EnableBuffer && !_messageQueue.IsAddingCompleted)
            {
                //启用缓冲区
                try
                {
                    _messageQueue.TryAdd(message);
                }
                catch (InvalidOperationException) { }
                return Task.CompletedTask;
            }
            else
            {
                return Task.Factory.StartNew(() =>
                {
                    try
                    {
                        Write(message);
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex);
                    }
                });
            }
        }

写到这里笔友可能会有疑问,怎么保证上面的Task只会new一次的?怎么保证日志都被放到 BlockingCollection ,然后被Task处理的?你的RabbitMQ连接是创建一次,复用信道还是每次都要创建?其实这些在笔者设计之初都已经考虑进去了,在 Startup 中注册服务的时候 RabbitLoggerPersistence 和ILoggerPersistence 使用的是 AddSingleton 方式,这样就能保证在整个程序运行期间只会有一个RabbitLoggerPersistence实例。下面贴上核心代码:

/// <summary>
    /// rabbitmq 日志持久化
    /// </summary>
    public class RabbitLoggerPersistence : ILoggerPersistence, IDisposable
    {
        /// <summary>
        /// 
        /// </summary>
        private readonly IOptionsMonitor<RabbitConnectOptions> _RabbitConnectOptions;
        /// <summary>
        /// 
        /// </summary>
        private readonly BlockingCollection<LoggerMetadata> _messageQueue = new BlockingCollection<LoggerMetadata>();
        /// <summary>
        /// 
        /// </summary>
        private IDisposable _optionsReloadToken;
        /// <summary>
        /// 
        /// </summary>
        private IConnection _Connection;
         
        /// <summary>
        /// 
        /// </summary>
        /// <param name="rabbitConnectOptions"></param>
        public RabbitLoggerPersistence(IOptionsMonitor<RabbitConnectOptions> rabbitConnectOptions)
        {
            _RabbitConnectOptions = rabbitConnectOptions ?? throw new ArgumentNullException("请配置Rabbit连接字符串");
            ReloadLoggerOptions(_RabbitConnectOptions.CurrentValue);
        }
        /// <summary>
        /// 初始化连接
        /// </summary>
        private void InitConnect()
        {
            // 1.实例化连接工厂
            var factory = new ConnectionFactory()
            {
                HostName = _RabbitConnectOptions.CurrentValue.HostName,
                Port = _RabbitConnectOptions.CurrentValue.Port,
                UserName = _RabbitConnectOptions.CurrentValue.UserName,
                Password = _RabbitConnectOptions.CurrentValue.Password,
                VirtualHost = string.IsNullOrWhiteSpace(_RabbitConnectOptions.CurrentValue.VirtualHost) ? "/" : _RabbitConnectOptions.CurrentValue.VirtualHost,
            };
            _Connection = factory.CreateConnection();
            _Connection.ConnectionUnblocked += ConnectionUnblocked;
            _Connection.ConnectionShutdown += ConnectionShutdown;
            _Connection.ConnectionBlocked += ConnectionBlocked;
            _Connection.ConnectionRecoveryError += ConnectionRecoveryError;
             
        }
        /// <summary>
        /// SDK自动重连 还是出错执行的方法
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void ConnectionRecoveryError(object sender, RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs e)
        {
        }
        /// <summary>
        /// 无法连接的时候执行此方法
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void ConnectionBlocked(object sender, RabbitMQ.Client.Events.ConnectionBlockedEventArgs e)
        {
             
        }
        /// <summary>
        /// 连接断开的时候支持此方法,SDK 默认会自动触发自动重连机制
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void ConnectionShutdown(object sender, ShutdownEventArgs e)
        {
        }
        /// <summary>
        /// 
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void ConnectionUnblocked(object sender, EventArgs e)
        {
            
        }
        /// <summary>
        /// 
        /// </summary>
        /// <param name="options"></param>
        private void ReloadLoggerOptions(RabbitConnectOptions options)
        {
            InitConnect();//配置修改后从新连接
            if (options.EnableBuffer)
            {
                MessageQueueBufferSend();
            }
            _optionsReloadToken = _RabbitConnectOptions.OnChange(ReloadLoggerOptions);
        }
        /// <summary>
        /// 
        /// </summary>
        /// <param name="message"></param>
        public void Write(LoggerMetadata message)
        {
            if (_Connection == null || message == null || !_Connection.IsOpen) return;
            
            try
            {
                // 3.创建信道
                using (var channel = _Connection.CreateModel())
                {
                    channel.ExchangeDeclare(_RabbitConnectOptions.CurrentValue.ExChange, ExchangeType.Direct, _RabbitConnectOptions.CurrentValue.IsDurable);
                    // 4.申明队列(指定durable:true,告知rabbitmq对消息进行持久化)
                    channel.QueueDeclare(
                        queue: _RabbitConnectOptions.CurrentValue.Queue, // 消息队列名称
                        durable: _RabbitConnectOptions.CurrentValue.IsDurable, // 是否缓存
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);
                    channel.QueueBind(_RabbitConnectOptions.CurrentValue.Queue, _RabbitConnectOptions.CurrentValue.ExChange, _RabbitConnectOptions.CurrentValue.RoutingKey, null);
                    // 将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;

                    // 5.构建byte消息数据包
                    var jsonSetting = new JsonSerializerSettings();
                    jsonSetting.DateFormatString= "yyyy-MM-dd HH:mm:ss";
                    jsonSetting.ContractResolver = new CamelCasePropertyNamesContractResolver();
                    var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message, jsonSetting));
                    // 6.发送数据包(指定basicProperties)
                    channel.BasicPublish(
                        exchange: _RabbitConnectOptions.CurrentValue.ExChange,
                        routingKey: _RabbitConnectOptions.CurrentValue.RoutingKey,
                        basicProperties: properties,
                        body: body);
                }
            }
            catch (Exception ex)
            {
                 
            }
        }
         
        /// <summary>
        /// 
        /// </summary>
        /// <param name="state"></param>
        private void MessageQueueBufferSend()
        {
            Task.Factory.StartNew(() =>
            {
                try
                {
                    // GetConsumingEnumerable 会导致线程阻塞,所以该 Task 正常情况下永远不会被释放
                    foreach (var message in _messageQueue.GetConsumingEnumerable())
                    {
                        Write(message);
                    }
                }
                catch
                {
                    try
                    {
                        _messageQueue.CompleteAdding();
                    }
                    catch { }
                }
            });
        }
         
        /// <summary>
        /// 
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        public Task WriteAsync(LoggerMetadata message)
        {
            if (_RabbitConnectOptions.CurrentValue.EnableBuffer && !_messageQueue.IsAddingCompleted)
            {
                //启用缓冲区
                try
                {
                    _messageQueue.TryAdd(message);
                }
                catch (InvalidOperationException) { }
                return Task.CompletedTask;
            }
            else
            {
                return Task.Factory.StartNew(() =>
                {
                    try
                    {
                        Write(message);
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex);
                    }
                });
            }
        }
         
    }

新建RabbitLogger实现ILogger接口

在该实现类中主要对日志的输出做处理,笔者定义了一个日志消息实体对象,该对象将被序列化成json格式后push到rabbit,然后由Logstash进行解析。

/// <summary>
    /// 日志元数据
    /// </summary>
    public class LoggerMetadata
    {
        /// <summary>
        /// 记录时间
        /// </summary>
        public DateTime LogTime { get; set; }
        /// <summary>
        /// 日志级别
        /// </summary>
        public string LogLevel { get; set; }
        /// <summary>
        /// 线程Id
        /// </summary>
        public int ThreadId { get; set; }
        /// <summary>
        /// 事件Id
        /// </summary>
        public int EventId { get; set; }
        /// <summary>
        /// 事件名称
        /// </summary>
        public string EventName { get; set; }
        /// <summary>
        /// 日志类别
        /// </summary>
        public string LogName { get; set; }
        /// <summary>
        /// 日志作用域
        /// </summary>
        public string LogScope { get; set; }
        /// <summary>
        /// 自定义消息
        /// </summary>
        public string Message { get; set; }
        /// <summary>
        /// 错误日志内容
        /// </summary>
        public string Exception { get; set; }
        /// <summary>
        /// 应用程序名称,可以自定义,会携带到消息记录中
        /// </summary>
        public string ApplicationName { get; set; }
        /// <summary>
        /// 当前所处的应用程序域
        /// </summary>
        public string AppDomainName { get; set; }
    }

下面贴上 RabbitLogger 的核心实现

 /// <summary>
    /// 
    /// </summary>
    public class RabbitLogger : ILogger
    {
        /// <summary>
        /// 
        /// </summary>
        public string Name { get; private set; }
        /// <summary>
        /// 
        /// </summary>
        public IExternalScopeProvider ScopeProvider { get; set; }
        /// <summary>
        /// 
        /// </summary>
        public RabbitLoggerOptions Options { get; set; }
        /// <summary>
        /// 
        /// </summary>
        private readonly ILoggerPersistence _LoggerPersistence;
        /// <summary>
        /// 
        /// </summary>
        /// <param name="name"></param>
        /// <param name="loggerPersistence"></param>
        public RabbitLogger(string name, ILoggerPersistence loggerPersistence)
        {
            this.Name = name;
            _LoggerPersistence = loggerPersistence;
        }
        /// <summary>
        /// 
        /// </summary>
        /// <typeparam name="TState"></typeparam>
        /// <param name="state"></param>
        /// <returns></returns>
        public IDisposable BeginScope<TState>(TState state)
        {
            return ScopeProvider?.Push(state);
        }
         
        /// <summary>
        /// 
        /// </summary>
        /// <param name="logLevel"></param>
        /// <returns></returns>
        public bool IsEnabled(LogLevel logLevel)
        {
            return logLevel >= Options.MinLevel;
        }
        /// <summary>
        /// 
        /// </summary>
        /// <typeparam name="TState"></typeparam>
        /// <param name="logLevel"></param>
        /// <param name="eventId"></param>
        /// <param name="state"></param>
        /// <param name="exception"></param>
        /// <param name="formatter"></param>
        public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
        {
            if (!IsEnabled(logLevel))
            {
                return;
            }
            if (formatter == null)
            {
                return;
            }
            var message = formatter(state, exception);
            if (!string.IsNullOrEmpty(message) || exception != null)
            {
                try
                {
                    WriteMessage(logLevel, Name, eventId, message, exception);
                }
                catch (Exception ex)
                {
                    // 防止因为异常记录服务出现异常 导致其他服务不能使用,所以这里捕获但是不处理和记录任何信息
                    Console.WriteLine(ex.ToString());
                }
            }
        }
        /// <summary>
        /// 
        /// </summary>
        /// <param name="logLevel"></param>
        /// <param name="logName"></param>
        /// <param name="eventId"></param>
        /// <param name="message"></param>
        /// <param name="exception"></param>
        public virtual void WriteMessage(LogLevel logLevel, string logName, EventId eventId, string message, Exception exception)
        {
             
            var ent = new LoggerMetadata();
            #region 第一段换行输出
            // 时间
            ent.ApplicationName = Options.ApplicationName;
            ent.AppDomainName = AppDomain.CurrentDomain.FriendlyName;
            ent.LogTime = DateTime.Now;
             
            //日志级别
            ent.LogLevel = logLevel.ToString();
            
            //线程id + 事件Id+时间名称
            ent.ThreadId = Thread.CurrentThread.ManagedThreadId;
            ent.EventId = eventId.Id;
            ent.EventName = eventId.Name;
             
            // 日志类别名称,一般为类名
            ent.LogName = logName;
            

            // 日志作用域
            ent.LogScope=GetScopeInformation();
            #endregion

            #region 第二段换行输出
            ent.Message = message;
            
            #endregion

            #region 第三段缓存输出
            ent.Exception = exception?.ToString();
             
            #endregion
            if (Options.EnableAsync)
            {
                _LoggerPersistence.WriteAsync(ent);//logBuilder.ToString()
            } else
            {
                _LoggerPersistence.Write(ent);//logBuilder.ToString()
            }
             
        }
         
    }


作者:暗夜余晖

本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

0

支持

0

反对

posted @2019-1-23  拜读(831)

评论列表

评论内容:



喜欢请打赏

支付宝 微信

请放心支付