一个简易的stream风格的异步日志系统的实现
之前阅读过一片有关于日志系统的论文,其中介绍了一种常见的日志系统的结构,可以分为frontend和backend两部分,前者被日志系统的User Thread调用,生成日志消息字符串到缓冲区中,后者往往作为一个专门的后台线程运行,接收来自frontend的内容,以某种方式将其flush到日志文件之中.在这里所介绍的日志系统中,尤其是在backend部分运用了不少可圈可点的trick来降低开销.
不过在我的实现中,我有些地方也没有做全,比如说日志级别的判断以及日志文件的滚动等等.
1. frontend
总的来说,如果想要用C++实现这里部分,所涉及的技术大概有这些:
- 运算符重载.将“<<”运算符转化为向缓冲区中添加字符串的操作,其中也包含生成字符串的部分,也就是将int、flout等非字符串类型的操作转化成字符串.
- 匿名对象与RAII.对应User Thread来说,调用日志输出API,比如LOG_DEBUG << "hello, world",其本质上是创建了一个Logger匿名对象,其生命周期只限于这个调用语句中,在构造函数中则是一些关于日志前缀的构造并append buffer,析构函数中则是一些后缀字符串的append,比如"".并且将buffer中的内容传达给backend.
下面介绍实现的细节.
1).Logger
Logger的核心在于一个LoggerStream对象,该对象中维护了一个专门用来保存日志消息字符串的buffer,并且定义了一些和append buffer相关的"<<"重载.除了LoggerStream对象之外,还包含了像log level,timestamp等信息.其中数据成员如下:
unsigned int line_number_;
LogLevel level_;
LoggerStream stream_;
TimeStamp time_stamp_; // 日志消息的生成时间
Logger可以说是User Thread直接打交道的东西,但外面还包含了一层宏定义,使得User可以以“LOG_DEBUG <<”这样的方式调用.如下所示:
#define LOG_INFO \
TinyMuduo::Logger(__FILE__, __LINE__, TinyMuduo::Logger::LogLevel::INFO, __FUNCTION__).getStream()
#define LOG_ERROR \
TinyMuduo::Logger(__FILE__, __LINE__, TinyMuduo::Logger::LogLevel::ERROR, __FUNCTION__).getStream()
#define LOG_DEBUG \
TinyMuduo::Logger(__FILE__, __LINE__, TinyMuduo::Logger::LogLevel::DEBUG, __FUNCTION__).getStream()
#define LOG_FATAL \
TinyMuduo::Logger(__FILE__, __LINE__, TinyMuduo::Logger::LogLevel::FATAL, __FUNCTION__).getStream()
由此可见,其实就是创建一个Logger临时对象,并且借助其stream.一句LOG语句大概可以分为三步:
- Logger构造函数的调用.如下所示:
Logger::Logger(const char * filename, unsigned int lineno, LogLevel level, const char *func):
line_number_(lineno),
level_(level),
stream_(),
time_stamp_(TimeStamp::getNowTimeStamp()) {
stream_ << "[" << time_stamp_.getTimeFormatString() << "]";
stream_ << "[tid:" << CurrThreadSpace::getCurrThreadId() << "]";
stream_ << "[" << getFileName(filename) << ": " << lineno;
stream_ << " " << func << "]";
stream_ << "[" << LevelMapVec[level_] << "]";
}
其实本质上也就是利用其中的stream对象的"<<"将一些日志前缀信息加入到其中的buffer中.
- 之后是利用Logger匿名对象返回的stream中的"<<"来处理日志消息的内容.
- 最后,由于匿名对象的生命周期只限于这一条语句中,因此最终会触发Logger的析构函数.如下所示:
Logger::~Logger() { // RAII真是无处不在
// 将buffer中的内容代入到output函数中
stream_ << "\n";
auto buffer = stream_.getBuffer();
LoggerOutput(buffer, stream_.getBufferLen());
}
LoggerOutput函数用于将buffer中的内容传送到日志后端,日志后端的class中,也就是调用AsyncLogging提供的一个append接口.
2).LoggerStream与FixedBuffer
LoggerStream中只包含了一个FixBuffer作为数据成员,FixedBuffer的实现很简单没什么好说的,就是一个固定大小的Buffer,其中有一个类似于迭代器的指针作为Buffer上的游标,其相关接口主要是为了实现append服务,就像相关的迭代器一样,比如说begin,end,getAvail等等.
LoggerStream中关于"<<"重载的操作如下,以Int系的为例,举个例子:
template<class InterType>
void LoggerStream::appendForInter(InterType val) {
std::string str(std::to_string(val));
append(str.c_str(), str.length());
}
LoggerStream &LoggerStream::operator<<(int val) {
appendForInter<int>(val);
return *this;
}
LoggerStream &LoggerStream::operator<<(uint val) {
appendForInter<uint>(val);
return *this;
}
LoggerStream &LoggerStream::operator<<(int64_t val) {
appendForInter<int64_t>(val);
return *this;
}
关于数字向字符串的转化,我使用了std::to_string进行.本质上就是调用了Buffer的append接口.关于Buffer中append的实现细节如下:
void append(const char *content, size_t len) {
// 需要考虑的是可用的条件
if (getAvail() >= len) {
memcpy(curr_, content, len);
curr_ += len;
}
}
简而言之就是,拷贝内容并且移动指针即可.
2. backend
通过之前的介绍,我们就了解到User Threads和Background thread构成了一个个生产者和一个消费者的关系,因此在backend中的设计中首先要考虑的是如何设计frontend到backend到接口,也就是上面提到的AsyncLogging的append接口,这一部分本质上仍然是在User Thread中进行的, 日志系统的设计中,尽可能将少给User Thread带来耗时操作是关键,因此后面我们在关于append接口的设计中,我们需要考虑,这一部分有可能带来哪些耗时操作,如何避免或者将其代价尽可能降低.
除了frontend和backend的交互部分,还有另一部分的设计也需要考虑.也就是Background thread本身的运行逻辑,这一部分往往是一个循环体,周期性地等待并将收集好的日志消息写入到日志文件中.
在这里有一个简单而直观的办法,就是维护一个由Buffer所组成的队列,append就是将Logger的Buffer将入到该队列中,在主循环中,每当队列中的Buffer数量到达一定的数量,就将其写入到日志文件中,或者超过指定的Interval.但是这种设计的缺点很明显,就是由Buffer所组成的队列中造成的内存碎片过大,造成内存浪费.日志的长度往往长短不一,Logger中的Buffer为了能够容纳绝大多数日志消息的场景,因此都设置的挺大,我在我的实现中用了4096个字节,而大多数日志消息也就几十字节的样子,因此每个加入到队列中的Buffer都有巨大的内存碎片.
Logger(LoggerStream)中也维护了一个Buffer,其size往往也远大于日志消息字符串的长度,为什么不需要考虑这种内存碎片问题?
因为Logger本身是作为一个匿名对象被调用的,作为一个栈上对象其生命周期只存在于一个LOG语句之中,因此很快就被析构了,给内存带来的压力并不大.
因此为了应对这个问题,再增加一个curr_buffer作为被User Thread直接写的Buffer,等到这个Buffer写满到一定大小之后,再加入到Buffer队列中,如果遇到curr_buffer已经放不开当前User Thread所产生的字符串的时候,就需要先将该Buffer放入到队列中,并且新分配一个新的Buffer,然后再加入到其中.这个设计方案确实避免了前面提到的内存碎片的问题,但是还有没有什么可以优化的余地?其中应当避免的耗时操作又是什么呢?
其中的耗时操作也就是curr_buffer写满时要new出来一个新的Buffer,因为内存分配是一笔较大的开销.muduo中着力于解决这个问题,其基本思想是在Background Thread给User Thread预备地留出可用的Buffer,因此避免了User Thread在append中因分配内存所带来的额外开销(正如前面所说的,我们应该尽可能减少User Thread因日志所造成的耗时操作).或者说这是一种双buffer的设计方案,一个是curr_buffer_另一个是next_buffer,一般情况下,在User Thread中的append中不再需要new出来新的Buffer,直接把next_buffer拿出来用就可以,至于相关的创建操作都转移到Background Thread的执行逻辑中了.这就好像是一个射手,只管开枪射击就好,在他射击的时候就已经有人给他填充好弹药了,射手因此无需射击完后再填充弹药,因此提高了攻击速度.
1).AsncyLogging
其中的核心数据成员如下:
std::atomic_bool is_running_;
uint32_t flush_interval_; // Background Thread等待周期的间隔
std::string logfile_name_; // 所要输出到的日志文件
std::unique_ptr<Thread> log_thread_; // Background Thread
std::mutex mutex_; // 锁
std::condition_variable condition_; // 条件变量
// 双buffer,一个是当前所可写的buffer,一个是预备用的buffer
BufferPtr curr_buffer_;
BufferPtr next_buffer_;
// 写满的buffer都将加入到这里,被Background Thread一批批地处理(批处理)
BufferVec full_buffers_;
2).append
这一部分的具体实现如下:
void AsyncLogging::append(const char *logmsg, size_t len) {
std::unique_lock<std::mutex> uniqueLock(mutex_);
if (curr_buffer_->getAvail() >= len) {
curr_buffer_->append(logmsg, len);
} else {
// 将curr_buffer加入到full_buffer中
full_buffers_.push_back(std::move(curr_buffer_));
if (!next_buffer_) {
next_buffer_ = std::make_unique<Buffer>(); // 新new一个
}
curr_buffer_ = std::move(next_buffer_);
curr_buffer_->append(logmsg, len);
condition_.notify_all(); // 毕竟只有一个正在条件变量上等待的
}
}
由于这里的User Threads和Background Thread之间是一种多生产者+单消费者的模式,因此append中的操作需要一个锁来保护.当curr_buffer大小足够时,直接加入进去就可以了.否则就需要将curr_buffer加入到full_buffers中,然后将next_buffer设置为curr_buffer.并将当前的消息字符串内容加入,随后唤醒正在等待休眠的Background Thread.
什么情况下没有已经分配好的next_buffer可用,仍然需要在append中创建呢?
如果frontend对于append的调用频率过高,导致刚刚创建的buffer就立马被消费完了.
3).logThreadFunc
其中的实现逻辑如下,首先是开始核心循环之前的一些预备操作.
void AsyncLogging::logThreadFunc() { // 这个是日志线程的主要逻辑
BufferPtr new_buffer1 = std::make_unique<Buffer>();
BufferPtr new_buffer2 = std::make_unique<Buffer>();
BufferVec output_buffers;
output_buffers.reserve(DEFAULT_FULLBUFFERS_SIZE);
auto wait_condition = [&]()-> bool {
return !full_buffers_.empty();
};
// 关于输出文件的stream
std::string newlogfile_name = logfile_name_ + "_" +
TimeStamp::getNowTimeStamp().getTimeFormatString();
std::ofstream log_output;
log_output.open(newlogfile_name.c_str(), std::ios::app); // 只能append
这一部分,我通过使用fstream文件流对象作为写入文件的方式.之前我们就提到过,Background Thread有“填充弹药”的义务,也就是将next_buffer的分配转移到了这个地方,一种直观的想法是,在其核心的循环中,每一轮都将next_buffer就new好就可以了.但是在这里的实现中又额外地创建了两个buffer,为什么需要这么做呢? 因为在Background Thread的处理逻辑中不仅仅需要分配next_buffer,还需要分配curr_buffer.至于给curr_buffer分配内存的作用又是什么呢?这需要结合其核心循环体:
while (is_running_) {
{
std::unique_lock<std::mutex> uniqueLock(mutex_);
if (full_buffers_.empty()) {
condition_.wait_for(uniqueLock, flush_interval_ * std::chrono::seconds(1), wait_condition);
}
// 将current加入到full_buffers中
full_buffers_.push_back(std::move(curr_buffer_));
curr_buffer_ = std::move(new_buffer1);
if (!next_buffer_) {
next_buffer_ = std::move(new_buffer2);
}
output_buffers.swap(full_buffers_); // 交换过来, 之后使用output_buffer
}
for (auto &output_buffer : output_buffers) {
// 输出该buffer中的内容
log_output << output_buffer->getData();
}
if (output_buffers.size() > 2) { // 调整大小,只留2个,用于分配给new_buffer1和new_buffer2
output_buffers.resize(2);
}
if (new_buffer1 == nullptr) {
new_buffer1 = std::move(output_buffers.back()); // 利用out_buffers中用过的buffer,也就是之前就分配好的
output_buffers.pop_back();
new_buffer1->reset();
}
if (new_buffer2 == nullptr) { // 同上
new_buffer2 = std::move(output_buffers.back());
output_buffers.pop_back();
new_buffer2->reset();
}
output_buffers.clear(); // 清空
log_output.flush(); // 刷
}
// 循环结束后的末尾.
log_output.flush();
log_output.close();
}
每一轮循环的动作大致分为三个部分:
- 等待并取出.知道有新的buffer被填满,或者interval时间间隔已过的情况,之后将curr_buffer加入到full_buffer中,并且给curr_buffer和next_buffer通过new_buffer1和new_buffer2分配内存(如果需要),将full_buffers移动到临时变量output_buffers中.
为什么需要在给curr_buffer分配内存?
是因为无论当前curr_buffer是否已经写满,我们都需要将curr_buffer中的内容读出来,放入到full_buffers中.正如下面代码中所展示的:
std::unique_lock<std::mutex> uniqueLock(mutex_); if (full_buffers_.empty()) { condition_.wait_for(uniqueLock, flush_interval_ * std::chrono::seconds(1), wait_condition); } // 将current加入到full_buffers中 full_buffers_.push_back(std::move(curr_buffer_)); curr_buffer_ = std::move(new_buffer1); if (!next_buffer_) { next_buffer_ = std::move(new_buffer2); } output_buffers.swap(full_buffers_); // 交换过来, 之后使用output_buffer
如果不将curr_buffer加入到full_buffer中,对于一些日志密度很低的系统中(低到长期填不满一个buffer),将导致没有任何日志消息被写入.就类似于操作系统中所提到的“饥饿”现象.
- 集中写入output_buffers.遍历output_buffers,借助之前打开的文件流对象写入Buffer中的内容.
借助临时变量,缩短临界区,并将耗时操作转移到临界区之外.
之前就提到了一个细节,就是将收集好的Buffer,也就是full_buffers move到output_buffers中.如果不这么做,就需要带着锁去遍历这些Buffer并进行写文件,写文件本身就是耗时的IO操作,如果处于上锁中,将会导致User Thread在获取锁所需要的阻塞时间大大延长了,对性能造成了严重的损耗.
- 预备new_buffer1和new_buffer2.如果new_buffer1或者new_buffer2中有为空的,就将其分配并重置.
利用new_buffer1和new_buffer2进一步减少Background Thread中的内存分配操作
相比直接对curr_buffer和next_buffer进行new,将分配好的new_buffer1和new_buffer2避免了内存分配在Background Thread的耗时操作,但是这两个buffer又是怎样保证时刻可用的呢?
其方式在于,当处理完output_buffers时,将其中废弃不同的buffer移动到new_buffer1和new_buffer2中.如何保证output_buffers中的数量足够用来填充的呢?关于out_buffers,在因为interval超时的情况下,output_buffer的size至少为1,然而这个时候next_buffer始终可用,因此也就不会有new_buffer2被移动给next_buffer的情况,所以这个时候1个就足够了(只需要给new_buffer1).在因为条件变量被User Thread唤醒的情况下,output_buffers的size起码为2,因为append中加进去了一个,在Background Thread被唤醒后又加入了一个.
3. 总结
对于这个日志系统的设计,我最大的启发就是一个好的日志系统,应该尽可能地避免给User Thread带来额外开销,更不应该阻塞.这个设计围绕着双buffer展开了这么多额外的逻辑,其最终目的就是为了使User Thread始终有一个可用的预备Buffer,进而无需new出新的Buffer.
但是个人认为这个系统设计仍然存在一些软肋:
- 众多User Threads都需要获取同一个锁,当线程数量大起来之后,锁竞争的开销就会很大.
- 为了避免“饥饿”现象, Background Thread每当苏醒过来都要将curr_buffer加入,这时curr_buffer往往没有写满,这个过程中可能会导致内存利用率较低的情况.