muduo异步日志实现
陈硕老师的muduo网络库的异步日志的实现,今晚有点晚了,我明晚再把这个异步日志抽出来,作为一个独立的日志库。
所在文件
AsyncLogging.cc
AsyncLogging.h
LogFile.h
LogFile.cc
CountDownLatch.h
CountDownLatch.cc
这个CountDownLatch有点像信号量,但是又只有down操作,上网查了以下类似的,作用有点像屏障
class AsyncLogging : noncopyable
{
public:
AsyncLogging(const string& basename,
off_t rollSize,
int flushInterval = 3);
~AsyncLogging()
{
if (running_)
{
stop();
}
}
void append(const char* logline, int len);
void start()
{
running_ = true;
thread_.start(); // 启动线程
latch_.wait(); // 这里的wait调用其实并不会阻塞
// 主线程
}
void stop() NO_THREAD_SAFETY_ANALYSIS
{
running_ = false;
cond_.notify();
thread_.join();
}
private:
// 默认的守护进程执行的函数
void threadFunc();
/*
Buffer的数据结构
private:
std::vector<char> buffer_;
size_t readerIndex_;
size_t writerIndex_;
static const char kCRLF[];
*/
typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer;
typedef std::vector<std::unique_ptr<Buffer>> BufferVector;
typedef BufferVector::value_type BufferPtr; // 表示容器元素的类型
const int flushInterval_; // 刷盘的时间间隔
std::atomic<bool> running_; // 是否运行
const string basename_; // 文件名
const off_t rollSize_; // 日志回滚的大小
muduo::Thread thread_; // 日志类自带一个守护线程来写
muduo::CountDownLatch latch_;
muduo::MutexLock mutex_;
muduo::Condition cond_ GUARDED_BY(mutex_);
BufferPtr currentBuffer_ GUARDED_BY(mutex_); // 当前缓冲
BufferPtr nextBuffer_ GUARDED_BY(mutex_); // 预备缓冲
BufferVector buffers_ GUARDED_BY(mutex_); // 已经写满并等待落盘的缓冲
};
#include <stdio.h>
using namespace muduo;
AsyncLogging::AsyncLogging(const string& basename,
off_t rollSize,
int flushInterval)
: flushInterval_(flushInterval),
running_(false),
basename_(basename),
rollSize_(rollSize),
thread_(std::bind(&AsyncLogging::threadFunc, this), "Logging"),
latch_(1),
mutex_(),
cond_(mutex_),
currentBuffer_(new Buffer),
nextBuffer_(new Buffer),
buffers_()
{
currentBuffer_->bzero();
nextBuffer_->bzero();
buffers_.reserve(16);
}
void AsyncLogging::append(const char* logline, int len)
{
muduo::MutexLockGuard lock(mutex_);
if (currentBuffer_->avail() > len)
{
currentBuffer_->append(logline, len);
}
else
{
buffers_.push_back(std::move(currentBuffer_));
if (nextBuffer_)
{
currentBuffer_ = std::move(nextBuffer_);
}
else
{
// 四个缓冲区都写满了
currentBuffer_.reset(new Buffer); // Rarely happens
}
currentBuffer_->append(logline, len);
// 这里的notify不一定什么时候都有效
// 如果此时守护线程正在工作
// 那么这个信后就会丢失,但是没有造成影响
// 但是有可能守护线程正在条件变量上睡眠
cond_.notify();
}
}
void AsyncLogging::threadFunc()
{
assert(running_ == true);
latch_.countDown(); // 计数器减一,latch_ == 0,并唤醒主线程
LogFile output(basename_, rollSize_, false); // 初始化一个输出流
BufferPtr newBuffer1(new Buffer);
BufferPtr newBuffer2(new Buffer);
newBuffer1->bzero();
newBuffer2->bzero();
BufferVector buffersToWrite; // 相当于一个前后端交互的单元
// 将写满的buffer装到vector中
// 在传输到后端
buffersToWrite.reserve(16);
while (running_)
{
assert(newBuffer1 && newBuffer1->length() == 0);
assert(newBuffer2 && newBuffer2->length() == 0);
assert(buffersToWrite.empty());
{
muduo::MutexLockGuard lock(mutex_);
// 使用条件变量完成定时任务
if (buffers_.empty()) // unusual usage!
{
cond_.waitForSeconds(flushInterval_);
}
buffers_.push_back(std::move(currentBuffer_));
currentBuffer_ = std::move(newBuffer1);
buffersToWrite.swap(buffers_); // buffers_变成一个空的buffers_
if (!nextBuffer_)
{
nextBuffer_ = std::move(newBuffer2);
}
}
assert(!buffersToWrite.empty());
// 如果日志太多了
if (buffersToWrite.size() > 25)
{
char buf[256];
snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
Timestamp::now().toFormattedString().c_str(),
buffersToWrite.size()-2);
fputs(buf, stderr);
// 先输出一个报警的日志
output.append(buf, static_cast<int>(strlen(buf)));
// 清除多余的日志
buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
}
for (const auto& buffer : buffersToWrite)
{
// FIXME: use unbuffered stdio FILE ? or use ::writev ?
output.append(buffer->data(), buffer->length());
}
if (buffersToWrite.size() > 2)
{
// drop non-bzero-ed buffers, avoid trashing
// vector底层是智能指针不用担心内存泄露
buffersToWrite.resize(2);
}
if (!newBuffer1)
{
assert(!buffersToWrite.empty());
newBuffer1 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer1->reset();
}
if (!newBuffer2)
{
assert(!buffersToWrite.empty());
newBuffer2 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer2->reset();
}
buffersToWrite.clear();
output.flush();
}
output.flush();
}