一个轻巧高效的多线程c++stream风格异步日志(二)


前言

本文紧接上一篇文章: 介绍上文中的一条条日志是如何异步导入本地文件的.

首先会简单介绍下LogFile类,之后会具体讲解下AsyncLogging中的双缓冲机制.

整个日志模块的结构图,

LogFile类

LogFile日志文件类 完成日志文件的管理工作.

rollFile() :滚动文件 当日志超过m_rollSize大小时会滚动一个新的日志文件出来.

getLogFileName() :用与滚动日志时,给日志文件取名,以滚动时间作为后缀.

m_mutex :用于append()数据时,给文件上锁.

append() :黏入日志.

flush() :冲刷缓冲.

LogFile 有一个AppendFIle类,它是最终用于操作本地文件的类.

append() : 里面会调用系统函数fwrite()写入本地文件.

flush() : 冲刷缓冲.

writtenBytes() : 获取已写字节数.

AsyncLogging类

AsyncLogging异步日志类, 完成日志的异步写入工作.

介绍它的接口前,先描述下它的工作逻辑.

AsyncLogging 有以下述几类缓存.

m_currentBuffer : 指向当前接收其他线程append过来的日志的缓存.

m_buffers : 用于存放当前已写满或过了冲刷周期的日志缓存的指针容器.

m_nextBuffer : 指向当m_currentBuffer满后用于替代m_currentBuffer的缓存.

backupBuffer1 : 备用缓存.

backupBuffer2 : 备用缓存.

buffersToWrite : 和m_buffers通过交换swap()后append()到LogFile的指针容器.

AsyncLogging 使用的双缓冲机制 有两个缓存容器 : m_buffers 、buffersToWrite 交替使用 . 一下我们简称为 A 和 B .

A 用于接收 其他线程 append() 进来的日志.

B 用于将目前已接受的缓存 写入 日志文件. 当B写完时 , clean() B , 交换A,B,如此往复.

优点 : 新建的日志不必等待磁盘操作,也避免了每条新日志都触发日志线程,而是将多条日志拼程一个大的buffer 传送给日志线程写入文件. 相当于批处理, 减少线程唤醒频率 ,降低开销。

另外 ,为了及时将 日志消息写入文件, 即是 buffer A 中还没有push进来日志 也会每三秒 执行一次上述的写入操作.

AsyncLogging使用一个更大的LogBuffer来保存一条条Logger传送过来的日志.

Mutex :用来控制多线程的写入.

Condition : 用来等待缓冲区中的数据.

Thread : 使用一个线程处理缓存的交换,以及日志的写入.

AsyncLogging实现

下面会给出AsyncLogging的简单实现.

实际上还有几个备用缓存,这里没有加上去,以便于理解程序; 备用缓存主要是为了减少反复new 操作带来的系统开销,

#ifndef _ASYNC_LOGGING_HH
#define _ASYNC_LOGGING_HH
#include "MutexLock.hh"
#include "Thread.hh"
#include "LogStream.hh"
#include "ptr_vector.hh"
#include "Condition.hh" #include <string> class AsyncLogging
{
public:
AsyncLogging(const std::string filePath, off_t rollSize, int flushInterval = 3);
~AsyncLogging(); void start(){
m_isRunning = true;
m_thread.start();
} void stop(){
m_isRunning = false;
m_cond.notify();
} void append(const char *logline, int len); private:
AsyncLogging(const AsyncLogging&);
AsyncLogging& operator=(const AsyncLogging&); void threadRoutine(); typedef LogBuffer<kLargeBuffer> Buffer;
typedef oneself::ptr_vector<Buffer> BufferVector;
typedef oneself::auto_ptr<Buffer> BufferPtr; const int m_flushInterval;
bool m_isRunning;
off_t m_rollSize;
std::string m_filePath;
Thread m_thread;
MutexLock m_mutex;
Condition m_cond; BufferPtr m_currentBuffer;
BufferVector m_buffers;
}; #endif //AsyncLogging.cpp
#include "AsyncLogging.hh"
#include "LogFile.hh"
#include <assert.h>
#include <stdio.h> AsyncLogging::AsyncLogging(const std::string filePath, off_t rollSize, int flushInterval)
:m_filePath(filePath),
m_rollSize(2048),
m_flushInterval(flushInterval),
m_isRunning(false),
m_thread(std::bind(&AsyncLogging::threadRoutine, this)),
m_mutex(),
m_cond(m_mutex),
m_currentBuffer(new Buffer),
m_buffers()
{
} AsyncLogging::~AsyncLogging(){
if(m_isRunning) stop();
} void AsyncLogging::append(const char* logline, int len){
MutexLockGuard lock(m_mutex);
if(m_currentBuffer->avail() > len){
m_currentBuffer->append(logline, len);
}
else{
m_buffers.push_back(m_currentBuffer.release()); m_currentBuffer.reset(new Buffer); m_currentBuffer->append(logline, len);
m_cond.notify();
}
} void AsyncLogging::threadRoutine(){
assert(m_isRunning == true);
LogFile output(m_filePath, m_rollSize, false);
BufferVector buffersToWrite;
buffersToWrite.reserve(8); while(m_isRunning){
assert(buffersToWrite.empty());
{
MutexLockGuard lock(m_mutex);
if(m_buffers.empty()){
m_cond.waitForSeconds(m_flushInterval);
}
m_buffers.push_back(m_currentBuffer.release());
m_currentBuffer.reset(new Buffer);
m_buffers.swap(buffersToWrite);
} assert(!buffersToWrite.empty()); for(size_t i = 0; i < buffersToWrite.size(); ++i){
output.append(buffersToWrite[i]->data(), buffersToWrite[i]->length());
} buffersToWrite.clear();
output.flush();
} output.flush();
}

增加备用缓存

增加备用缓存优化上面程序,上面程序一共在两个地方执行了new操作.

1.m_currentBuffer 填满时,需要把它填进容器的时候.

2.到时间了需要把m_currentBuffer里面的内容写入本地文件时,会把它当前的内容移出来,这时候需要new一个新缓存来给m_currentBuffer.

于是我们准备一个m_nextBuffer来做m_currentBuffer的备用缓存.同时在线程中增加两个backupBuffer 给m_nextBuffer 当备用缓存;当日志量大到不够用的时候, 再考虑用new 操作来动态添加缓存。

#ifndef _ASYNC_LOGGING_HH
#define _ASYNC_LOGGING_HH
#include "MutexLock.hh"
#include "Thread.hh"
#include "LogStream.hh"
#include "ptr_vector.hh" #include "Condition.hh"
#include <memory>
#include <string> class AsyncLogging
{
public:
AsyncLogging(const std::string filePath, off_t rollSize, int flushInterval = 3);
~AsyncLogging(); void start(){
m_isRunning = true;
m_thread.start();
} void stop(){
m_isRunning = false;
m_cond.notify();
} void append(const char *logline, int len); private:
AsyncLogging(const AsyncLogging&);
AsyncLogging& operator=(const AsyncLogging&); void threadRoutine(); typedef LogBuffer<kLargeBuffer> Buffer;
typedef myself::ptr_vector<Buffer> BufferVector;
typedef std::unique_ptr<Buffer> BufferPtr; const int m_flushInterval;
bool m_isRunning;
off_t m_rollSize;
std::string m_filePath;
Thread m_thread;
MutexLock m_mutex;
Condition m_cond; BufferPtr m_currentBuffer;
BufferPtr m_nextBuffer;
BufferVector m_buffers;
}; #endif //AsynvLogging.cpp
#include "AsyncLogging.hh"
#include "LogFile.hh"
#include <assert.h>
#include <stdio.h> AsyncLogging::AsyncLogging(const std::string filePath, off_t rollSize, int flushInterval)
:m_filePath(filePath),
m_rollSize(rollSize),
m_flushInterval(flushInterval),
m_isRunning(false),
m_thread(std::bind(&AsyncLogging::threadRoutine, this)),
m_mutex(),
m_cond(m_mutex),
m_currentBuffer(new Buffer),
m_nextBuffer(new Buffer),
m_buffers()
{
} AsyncLogging::~AsyncLogging(){
if(m_isRunning) stop();
} void AsyncLogging::append(const char* logline, int len){
MutexLockGuard lock(m_mutex);
if(m_currentBuffer->avail() > len){
m_currentBuffer->append(logline, len);
}
else{
m_buffers.push_back(m_currentBuffer.release()); if(m_nextBuffer){
m_currentBuffer = std::move(m_nextBuffer);
}
else{
m_currentBuffer.reset(new Buffer);
} m_currentBuffer->append(logline, len);
m_cond.notify();
}
} void AsyncLogging::threadRoutine(){
assert(m_isRunning == true);
LogFile output(m_filePath, m_rollSize, false);
BufferPtr backupBuffer1(new Buffer);
BufferPtr backupBuffer2(new Buffer);
BufferVector buffersToWrite;
buffersToWrite.reserve(8); while(m_isRunning){
assert(buffersToWrite.empty());
{
MutexLockGuard lock(m_mutex);
if(m_buffers.empty()){
m_cond.waitForSeconds(m_flushInterval);
}
m_buffers.push_back(m_currentBuffer.release());
m_currentBuffer = std::move(backupBuffer1);
m_buffers.swap(buffersToWrite);
if(!m_nextBuffer)
m_nextBuffer = std::move(backupBuffer2);
} assert(!buffersToWrite.empty()); for(size_t i = 0; i < buffersToWrite.size(); ++i){
output.append(buffersToWrite[i]->data(), buffersToWrite[i]->length());
} if(buffersToWrite.size() > 2)
{
// drop non-bzero-ed buffers, avoid trashing
buffersToWrite.resize(2);
} if(!backupBuffer1)
{
assert(!buffersToWrite.empty());
backupBuffer1 = std::move(buffersToWrite.pop_back());
backupBuffer1->reset();
} if(!backupBuffer2)
{
assert(!buffersToWrite.empty());
backupBuffer2 = std::move(buffersToWrite.pop_back());
backupBuffer2->reset();
} buffersToWrite.clear();
output.flush();
} output.flush();
}

结语

本文主要介绍了muduo中AsyncLogging类的实现,其中的双缓存机制.

LogFile类及AppendFIle类 分别是日志文件管理类和本地文件的基本操作类. 不难理解,感兴趣的话可以看看muduo的源码,本文不再往下写了,如果想要全部源码可以留言。

最新源码:

https://github.com/BethlyRoseDaisley/SimpleMuduo/tree/master/AsyncLogging

最新文章

  1. Jackson轻易转换JSON
  2. iOS - Card Identification 银行卡号识别
  3. java io系列16之 PrintStream(打印输出流)详解
  4. 介绍开源的.net通信框架NetworkComms框架 源码分析(二)ConnectionInfo
  5. Android 怎么使用Bitmap+Canvas 自适应屏幕
  6. 前端面试题:css相关面试题
  7. Java中高级面试题
  8. 在nuxt中加入element-ui插件遇到的问题
  9. [日常] PKUWC 2018爆零记
  10. Gradle入门到实战(二) — ImageOptimization安卓图片转换压缩插件
  11. fiddler 一些不为人知的功能
  12. 记一次gitlab-ce数据恢复过程
  13. 运维常用mysql语句
  14. numpy中pad函数的常用方法
  15. hdu3518
  16. Python学习笔记之面向对象编程(三)Python类的魔术方法
  17. CentOS 6.5 搭建cuda环境
  18. hdu6447 YJJ&#39;s Salesman
  19. UML类图中的各种箭头代表的含义(转自:http://www.cnblogs.com/damsoft/archive/2016/10/24/5993602.html)
  20. 关于jquery的on,你怎么绑定就怎么解除

热门文章

  1. Python基础【day03】:文件操作(六)
  2. C语言复习---二维数组和二级指针的关系:没关系,别瞎想(重点)
  3. AttributeError: &#39;module&#39; object has no attribute &#39;X509_up_ref&#39;
  4. html5 canvas 径向渐变
  5. Django中合并同一个model的多个QuerySet
  6. 第6月第17天 CGAffineTransformMake(a,b,c,d,tx,ty) 矩阵运算的原理
  7. SQL Server 索引(一)数据结构和存储结构
  8. WPF 中定时器的使用
  9. OracleDBConsoleorcl 启动不了 服务特定错误2【解决办法】
  10. POI操作Excel详解,HSSF和XSSF两种方式