IO多路转接 - epoll
一、I/O多路转接之 epoll
1. epoll 接口
(1)epoll_create()
首先 epoll_create()
这个接口就是帮我们创建一个 epoll 模型,这个模型是什么我们后面介绍原理的时候再讲。
其中 epoll_create()
的接口如下:
其中 epoll_create1()
是新标准,我们不介绍。而 epoll_create()
的参数 size 也已经废弃了,这个参数传什么也无所谓了,只要大于 0 就可以了。它的返回值也是一个文件描述符,成功则返回一个新的文件描述符,失败则返回 -1.
(2)epoll_wait()
当 epoll 模型创建好之后,我们想往这个 epoll 模型中新增一个要关心的 fd 及其事件;修改一个或者删除一个文件描述符及其事件;就需要用到 epoll_wait()
这个接口。该接口如下:
epoll_wait() 本质就是获取已经就绪的文件描述符。第一个参数 epfd 就是 epoll_create() 的返回值;第二个和第三个参数就是我们将来定义的一个用户级缓冲区,返回已经就绪的 fd 和 事件;最后一个参数的含义和 poll 的 timeout 一模一样,单位为毫秒。而返回值表示已经就绪的文件描述符的个数。
其中我们看到第二个参数中带有 struct epoll_event
这个类型的结构体,这个结构体是什么呢?我们来看一下:
如上图,epoll_event 中的 events 表示哪些事件,它的类型是 uint32_t,也就是一个位图,和 poll 中的 events 一样,以位图的形式传递标记位事件;而第二个字段 data 的类型 epoll_data_t 是一个联合体,就是可以选择该联合体字段中的任意一个,通常用来保存的是用户级的数据,有关这个字段我们后面再说。
其中 events 可以是以下几个宏的集合:
- EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
- EPOLLOUT : 表示对应的文件描述符可以写;
- EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
- EPOLLERR : 表示对应的文件描述符发生错误;
- EPOLLHUP : 表示对应的文件描述符被挂断;
- EPOLLET : 将 EPOLL 设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的;
- EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个 socket 的话,需要再次把这个 socket 加入到 EPOLL 队列里。
(3)epoll_ctl()
为了使用这个 epoll,首先我们也需要将 listen 套接字添加到 epoll 模型里,所以就需要用到 epoll_ctl()
接口。该接口如下:
它的作用主要是我们想向系统里新增一个文件描述符,及其要关心的事件,想要修改一个特定的文件描述符关心的事件。所以 epoll_ctl()
支持 epoll 来进行相关的管理工作。
其中第一个参数就是 epoll_create() 的返回值。第二个参数 op 就是以下三个选项的其中一个,分别代表增加,修改,删除:
第三个参数和第四个参数代表哪一个文件描述符上的哪些事件。
所以我们需要使用 epoll 的话,需要使用以上三个系统调用,而 select 和 poll 都只有一个系统调用。
2. epoll 原理
无论是 select 和 poll 都是用数组来管理文件描述符和对应的事件,更重要的是该数组这个数据结构是由用户来维护的!接下来我们解释一下 epoll 的原理,就能明白为什么需要三个系统调用来使用 epoll 了。
操作系统在硬件层面,通过硬件中断的方式知道网卡上有数据了,然后通过网卡驱动上的方法将数据拷贝到网卡驱动上的数据链路层。同时,操作系统为了支持 epoll,它为我们提供了三种机制:
- 操作系统在内部会帮我们维护一颗红黑树
其中红黑树中的节点包含的最重要的字段是:int fd
和 uint32_t event
,分别代表内核要关心的文件描述符和要关心的事件。
- 操作系统会为我们维护一个就绪队列
一旦红黑树中有特定的一个节点,比如某个节点上的文件描述符的某个事件就绪了,就可以把该节点添加到就绪队列中;其中该就绪队列中每个节点中的字段包含 int fd
和 uint32_t event
,分别代表已经就绪的文件描述符和已经就绪的事件。
- 操作系统的底层网卡驱动是允许操作系统去注册一些回调机制的
操作系统内部会提供一个回调函数,这个回调函数是用来干什么的呢?首先网卡通过硬件中断的方式将数据搬到了网卡驱动层。当网卡驱动层中的数据链路层有数据就绪了,主动会调用该回调函数。然后该回调方法会做如下几个操作:
- 向上交付
- 交付给 TCP 的接收队列
- 根据文件描述符为键值查找红黑树,确认这个接受队列和哪一个文件描述符是关联的,再判断该 fd 是否关心了 EPOLLIN 或者 EPOLLOUT 读写事件,如果有,由于数据已经就绪,所以接下来第四步
- 构建就绪节点,插入到就绪队列中
实际上我们用 epoll 的时候,操作系统就会把该回调函数注册到底层,然后底层数据一旦就绪就会自动回调执行上面的四个方法。所以对于用户来说,只需要在就绪队列中获取就绪节点即可!整套机制都是由操作系统完成的!
我们把这三套机制叫做 epoll 模型,如下图:
其中 eventpoll 为 epoll 对象;epitem 为红黑树的节点。
所以接口 epoll_create()
创建 epoll 模型本质就是创建红黑树,创建就绪队列以及注册底层的回调机制。所以该 epoll 模型怎么让进程找到呢?其实给 epoll 模型放入到 struct file 对象即可,把它也当作文件!因为在 Linux 中一切皆文件!struct file 中也有指针指向 epoll 模型!所以再把该 struct file 对象添加到进程的文件描述符表中即可!
所以 epoll_create()
实质上就是在操作系统中创建 struct file,其中的指针指向整个 epoll 对象,对应的文件描述符就能挂接到进程的文件描述符表中,最后把该文件描述符返回给用户,所以我们就可以通过该文件描述符找到 struct file 并找到 epoll 模型了。epoll_ctl()
实质上的增加、修改、删除都是在对红黑树进行操作。其中 epoll_wait()
的第二个参数是输出型参数,它会将就绪队列中所有就绪的节点一个一个地放进 struct epoll_event
里。
3. epoll 的优点
基于 epoll 的原理,我们可以得到 epoll 的优势:
- 检测就绪的时间复杂度为 O(1),因为只需要看队列是否为空就可以了。而获取就绪的时间复杂度为 O(n),因为需要将就绪队列中的节点一个一个拷贝到应用层。
- fd 和 event 没有上限,因为该红黑树有多大由操作系统说了算
- 由于该红黑树是操作系统帮我们维护的,所以不需要在用户层由用户维护一个数组这样的数据结构,来管理所有的文件描述符及其要关心的事件了
- epoll_wait() 的返回值 n,表示有 n 个 fd 就绪了,那么该接口还会将已经就绪的节点放入到它的输出型参数 events 中,所以就绪事件是连续的,有 n 个!这意味着,上层用户处理已经就绪的事件,就不再需要像以前一样检测有哪些 fd 是非法的,哪些是没有就绪的了;只需要根据返回值 n,遍历 events 即可!
4. epoll 的使用
我们对 epoll 的相关接口进行一下简单的封装成为 Epoller.hpp,如下:
#pragma once
#include "NoCopy.hpp"
#include "log.hpp"
#include <sys/epoll.h>
#include <cstring>
#include <cerrno>
class Epoller : public NoCopy
{
static const int size = 128;
public:
Epoller()
{
_epfd = epoll_create(size);
if(_epfd == -1){
lg(Fatal, "epoll_create error: %s", strerror(errno));
}
else{
lg(Info, "epoll_create success: %d", _epfd);
}
}
int EpollerWait(struct epoll_event revents[], int num)
{
int n = epoll_wait(_epfd, revents, num, _timeout);
return n;
}
int EpollerCtl(int oper, int sockfd, uint32_t event)
{
int n = 0;
if(oper == EPOLL_CTL_DEL)
{
n = epoll_ctl(_epfd, oper, sockfd, nullptr);
if(n != 0){
lg(Error, "epoll_ctl delete error!");
}
}
else
{
// EPOLL_CTL_MOD 或者 EPOLL_CTL_ADD
// 设置进内核的红黑树中
struct epoll_event ev;
ev.events = event;
ev.data.fd = sockfd; // 方便我们后面处理的时候知道是哪一个 fd 就绪了
n = epoll_ctl(_epfd, oper, sockfd, &ev);
if(n != 0){
lg(Error, "epoll_ctl error!");
}
}
return n;
}
~Epoller()
{
if(_epfd >= 0){
close(_epfd);
}
}
private:
int _epfd;
int _timeout = 1000;
};
接下来编写 epollSever.hpp,如下:
#pragma once
#include <iostream>
#include <memory>
#include <sys/epoll.h>
#include "Socket.hpp"
#include "log.hpp"
#include "Epoller.hpp"
#include "NoCopy.hpp"
uint32_t EVENT_IN = (EPOLLIN);
uint32_t EVENT_OUT = (EPOLLOUT);
class EpollServer : public NoCopy
{
static const int maxevents = 64;
public:
EpollServer(uint16_t port)
:_port(port)
,_listenSocket_ptr(new Sock())
,_epoller_ptr(new Epoller())
{}
void Init()
{
_listenSocket_ptr->Socket();
_listenSocket_ptr->Bind(_port);
_listenSocket_ptr->Listen();
lg(Info, "create listen socket success: %d\n", _listenSocket_ptr->GetFd());
}
void Accepter()
{
std::string client_ip;
uint16_t client_port;
int sockfd = _listenSocket_ptr->Accept(&client_ip, &client_port);
if(sockfd > 0){
// 不能直接读取,而是将它添加到内核的红黑树中,让 epoll 关心即可
_epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, sockfd, EVENT_IN);
lg(Info, "get a new link, client info@ %s:%d", client_ip.c_str(), client_port);
}
else{
return;
}
}
// for test
void Recver(int fd)
{
char buffer[1024];
ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << "get a message: " << buffer << std::endl;
// write
std::string echo_str = "sever echo $ ";
echo_str += buffer;
write(fd, echo_str.c_str(), echo_str.size());
}
else if (n == 0)
{
lg(Info, "client quit, me too, close fd is: %d", fd);
// 先在内核红黑树中移除 fd,再关闭 fd
_epoller_ptr->EpollerCtl(EPOLL_CTL_DEL, fd, 0);
close(fd);
}
else
{
lg(Warning, "recv error, fd is: %d", fd);
_epoller_ptr->EpollerCtl(EPOLL_CTL_DEL, fd, 0);
close(fd);
}
}
void Dispatcher(struct epoll_event revs[], int num)
{
for(int i = 0; i < num; ++i){
uint32_t event = revs[i].events;
int fd = revs[i].data.fd;
if(event & EVENT_IN){
// 读事件就绪
if(fd == _listenSocket_ptr->GetFd()){
// 获取到一个新连接,连接管理器
Accepter();
}
else{
// 其它 fd 上的普通读取事件就绪
Recver(fd);
}
}
else if(event & EVENT_OUT){
// 写事件就绪
// ...
}
else{
// ...
}
}
}
void Start()
{
// 将 listenSocket 添加到 epoll 中
// 也就是将 listenSocket 和它所关心的事件添加到内核 epoll 模型中的红黑树中!
_epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, _listenSocket_ptr->GetFd(), EVENT_IN);
struct epoll_event revs[maxevents];
while(true)
{
// 其中 n 最大是 maxevents
int n = _epoller_ptr->EpollerWait(revs, maxevents);
if(n > 0)
{
// 有事件就绪,分派事件
lg(Debug, "event happend, fd is: %d", revs[0].data.fd);
Dispatcher(revs, n);
}
else if(n == 0)
{
lg(Info, "time out...");
}
else
{
lg(Error, "epoll wait error");
}
}
}
~EpollServer()
{
_listenSocket_ptr->Close();
}
private:
std::shared_ptr<Sock> _listenSocket_ptr;
std::shared_ptr<Epoller> _epoller_ptr;
uint16_t _port;
};
我们上面两个模块都用到了 NoCopy 这个类,也就是禁止拷贝,代码如下:
#pragma once
class NoCopy
{
public:
NoCopy(){}
NoCopy(const NoCopy&) = delete;
const NoCopy& operator=(const NoCopy&) = delete;
};
5. epoll 的工作模式
(1)水平触发 Level Triggered 工作模式(LT 模式)
epoll 默认所处的工作模式就是 LT 模式。例如我们上面所写的简单的 epoll 服务器,每次有新的连接到来时,如果我们不处理它,epoll 会每次都通知我们有连接到来了。这种一旦有新的连接到来,或者有新的数据到来,上层如果不取走,底层就会一直通知上层,让上层把数据尽快取走,这种模式就叫做 LT 模式。就像示波器中的高电平,一直有效。
(2)边缘触发 Edge Triggered 工作模式(ET 模式)
而 ET 模式指的是,数据或者连接,从无到有,从有到多,变化的时候,才会通知我们一次。正是因为 ET 模式有这种特点,才会倒逼程序员每次通知都必须把本轮数据全部取走,怎么保证数据全部取走呢?所以就需要循环读取,直到读取出错!但是我们使用 read() 或者 recv() 在缓冲区中读取数据的时候,当缓冲区的数据没有了,因为它们的读取方式默认是阻塞的,所以此时就会阻塞,服务器就会被挂起!所以我们在 ET 模式下,所有的 fd 必须是要设置为非阻塞的!
(3)LT 和 ET 的对比
select 和 poll 其实也是工作在 LT 模式下;epoll 既可以支持 LT,也可以支持 ET;
普遍地我们认为,ET 的工作模式比 LT 的工作模式通知效率更高,因为通知一次就可以倒逼上层把全部数据读取走。同时也看得出来 ET 模式的 IO 效率也更高,这也就意味着,TCP 会向对方通告一个更大的窗口,从而从概率上让对方一次给自己发送更多的数据!
所谓的 LT 模式和 ET 模式,本质就是向就绪队列中放入多个或者一个就绪的事件
但是 ET 模式就一定比 LT 模式的效率高吗?不一定!因为 LT 也可以将所有的 fd 设置为非阻塞,然后循环读取,也就是当通知一次的时候,就把数据全部取走了,就和 ET 一样了!所以谁的效率高不一定,要看具体的实现。
二、Reactor
1. 概念
我们在上面编写的 epoll 服务器的代码中,在其他普通的 fd 读取事件就绪时,也就是在 Recver() 中,读取是有问题的,因为我们不能区分每次读取上来的数据是一个完整的报文。另外还有其它各种问题,所以我们要对上面的代码使用 Reactor 的设计模式作修改。
所谓的 Reactor 是一种设计模式,翻译过来称为反应堆模式。用于处理事件驱动的系统中的并发操作。它提供了一种结构化的方式来处理输入事件,并将其分发给相应的处理程序。Reactor 模式通常用于网络编程中,特别是在服务器端应用程序中。
要进行正确的 IO 处理,就应该有如下的理解:在应用层一定存在大量的连接,每一个连接在应用层都叫做文件描述符。而在读取每一个文件描述符上的数据的时候,可能根本就没有读取完,此时我们就需要把该文件描述符上的数据临时保存起来。所以我们在写服务器的时候,我们要保证每一个文件描述符及其连接及其缓冲区,都是独立的!
2. 实现
(1)Epoller.hpp
Epoller.hpp 是对 epoll 的系统调用的封装,代码如下:
#pragma once
#include "NoCopy.hpp"
#include "log.hpp"
#include <sys/epoll.h>
#include <cstring>
#include <cerrno>
class Epoller : public NoCopy
{
static const int size = 128;
public:
Epoller()
{
_epfd = epoll_create(size);
if(_epfd == -1){
lg(Fatal, "epoll_create error: %s", strerror(errno));
}
else{
lg(Info, "epoll_create success: %d", _epfd);
}
}
int EpollerWait(struct epoll_event revents[], int num, int timeout)
{
int n = epoll_wait(_epfd, revents, num, timeout);
return n;
}
int EpollerCtl(int oper, int sockfd, uint32_t event)
{
int n = 0;
if(oper == EPOLL_CTL_DEL)
{
n = epoll_ctl(_epfd, oper, sockfd, nullptr);
if(n != 0){
lg(Error, "epoll_ctl delete error! sockfd: %d", sockfd);
}
}
else
{
// EPOLL_CTL_MOD 或者 EPOLL_CTL_ADD
// 设置进内核的红黑树中
struct epoll_event ev;
ev.events = event;
ev.data.fd = sockfd; // 方便我们后面处理的时候知道是哪一个 fd 就绪了
n = epoll_ctl(_epfd, oper, sockfd, &ev);
if(n != 0){
lg(Error, "epoll_ctl error!");
}
}
return n;
}
~Epoller()
{
if(_epfd >= 0){
close(_epfd);
}
}
private:
int _epfd;
int _timeout = 1000;
};
(2)TcpServer.hpp
TcpServer.hpp 是处理 IO 的服务器,代码如下:
#pragma once
#include <iostream>
#include <string>
#include <unordered_map>
#include <memory>
#include <functional>
#include <cerrno>
#include "log.hpp"
#include "NoCopy.hpp"
#include "Epoller.hpp"
#include "Socket.hpp"
#include "Comm.hpp"
// 设置 ET 模式
uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
const static int g_buffer_size = 128;
class Connection;
class TcpServer;
using func_t = std::function<void(std::weak_ptr<Connection>)>;
using except_t = std::function<void(std::weak_ptr<Connection>)>;
// 管理每一个连接
class Connection
{
public:
Connection(int sockfd)
:_sockfd(sockfd)
{}
void SetHandler(func_t recv_cb, func_t send_cb, except_t except_cb)
{
_recv_cb = recv_cb;
_send_cb = send_cb;
_except_cb = except_cb;
}
~Connection()
{}
private:
int _sockfd;
// 充当缓冲区
std::string _inbuffer;
std::string _outbuffer;
public:
// 回指指针
// std::shared_ptr<TcpServer> _tcpServer_ptr;
std::weak_ptr<TcpServer> _tcpServer_ptr;
// 回调方法
func_t _recv_cb;
func_t _send_cb;
except_t _except_cb;
std::string _ip;
uint16_t _port;
int Sockfd()
{
return _sockfd;
}
void AppendInBuffer(const std::string& info)
{
_inbuffer += info;
}
void AppendOutBuffer(const std::string& info)
{
_outbuffer += info;
}
std::string& Inbuffer()
{
return _inbuffer;
}
std::string& Outbuffer()
{
return _outbuffer;
}
void SetWeakPtr(std::weak_ptr<TcpServer> tcpServer_ptr)
{
_tcpServer_ptr = tcpServer_ptr;
}
};
// enable_shared_from_this 可以提供返回当前对象的 this 对应的 shared_ptr
class TcpServer : public std::enable_shared_from_this<TcpServer>, public NoCopy
{
static const int num = 64;
public:
TcpServer(uint16_t port, func_t OnMessage)
:_port(port)
,_quit(true)
,_OnMessage(OnMessage)
,_epoller_ptr(new Epoller())
,_listenSock_ptr(new Sock())
{}
void Init()
{
_listenSock_ptr->Socket();
// 将 fd 设置为非阻塞
SetNonBlockOrDie(_listenSock_ptr->GetFd());
_listenSock_ptr->Bind(_port);
_listenSock_ptr->Listen();
lg(Info, "create listen socket success: %d\n", _listenSock_ptr->GetFd());
AddConnection(_listenSock_ptr->GetFd(), EVENT_IN, \
std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
}
void AddConnection(int sockfd, uint32_t event, func_t recv_cb, func_t send_cb, except_t except_cb,\
const std::string& ip = "0.0.0.0", uint16_t port = 0)
{
// 1. 给所有的套接字建立一个 connection 对象
std::shared_ptr<Connection> new_connection(new Connection(sockfd));
// shared_from_this(): 返回当前对象的 shared_ptr
new_connection->SetWeakPtr(shared_from_this());
new_connection->SetHandler(recv_cb, send_cb, except_cb);
new_connection->_ip = ip;
new_connection->_port = port;
// 2. 将套接字和 Connection 添加到 unordered_map 中
_connections.insert(std::make_pair(sockfd, new_connection));
// 3. 将 listen 套接字或其它事件添加到 epoll 模型中
_epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, sockfd, event);
lg(Debug, "add a new connection success, sockfd is: %d", sockfd);
}
// listen 套接字的连接管理器,即有事件就绪的时候,就是有连接到来,就需要处理新连接
void Accepter(std::weak_ptr<Connection> conn)
{
auto connection = conn.lock();
// 不断检测是否还有新连接,直到读取出错
while(true){
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sockfd = ::accept(connection->Sockfd(), (sockaddr*)&peer, &len);
// 获取到新连接设置为非阻塞,然后构建 Connection 对象放入哈希表中和内核红黑树中
if(sockfd > 0){
uint16_t peer_port = ntohs(peer.sin_port);
char ipbuffer[128];
inet_ntop(AF_INET, &peer.sin_addr, ipbuffer, sizeof(ipbuffer));
lg(Debug, "get a new client, get info-> [%s: %d], sockfd: %d", ipbuffer, peer_port, sockfd);
SetNonBlockOrDie(sockfd);
AddConnection(sockfd, EVENT_IN, \
std::bind(&TcpServer::Recver, this, std::placeholders::_1),\
std::bind(&TcpServer::Sender, this, std::placeholders::_1),\
std::bind(&TcpServer::Excepter, this, std::placeholders::_1),\
ipbuffer, peer_port);
}
else{
if(errno == EWOULDBLOCK){ // 读取完毕
break;
}
else if(errno == EINTR){ // 信号原因中断
continue;
}
else{
break;
}
}
}
}
// 普通事件的事件管理器
// 对于服务器而言只需要进行IO,不需要关心是否读完和报文的格式
void Recver(std::weak_ptr<Connection> conn)
{
if(conn.expired()) return;
auto connection = conn.lock();
int sockfd = connection->Sockfd();
while(true){
char buffer[g_buffer_size];
memset(buffer, 0, sizeof(buffer));
ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0); // 非阻塞读取
if(n > 0){
connection->AppendInBuffer(buffer);
}
else if(n == 0){
lg(Info, "sockfd: %d, client info %s: %d quit...", sockfd, connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
}
else{
if(errno == EWOULDBLOCK){
break;
}
else if(errno == EINTR){
continue;
}
else{
lg(Warning, "sockfd: %d, client info %s: %d recv error...", sockfd, connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
}
}
}
// 交给上层处理,读取到的数据都在 connection 中
// 1. 检测
// 2. 如果有完整报文,就处理
_OnMessage(connection);
}
void Sender(std::weak_ptr<Connection> conn)
{
if(conn.expired()) return;
auto connection = conn.lock();
auto& outbuffer = connection->Outbuffer();
while(true){
ssize_t n = send(connection->Sockfd(), outbuffer.c_str(), outbuffer.size(), 0);
if(n > 0){
outbuffer.erase(0, n);
if(outbuffer.empty()){
break;
}
}
else if(n == 0){
return;
}
else{
if(errno == EWOULDBLOCK){
break;
}
else if(errno == EINTR){
continue;
}
else{
lg(Warning, "sockfd: %d, client info %s: %d send error...", connection->Sockfd(), connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
}
}
}
// 没发完
if(!outbuffer.empty()){
// 开始对写事件关心
EnableEvent(connection->Sockfd(), true, true);
}
else{
// 关闭对写事件关心
EnableEvent(connection->Sockfd(), true, false);
}
}
void Excepter(std::weak_ptr<Connection> connection)
{
if(connection.expired()) return;
auto conn = connection.lock();
lg(Debug, "Excepter hander sockfd: %d, client info %s: %d excepter handler", \
conn->Sockfd(), conn->_ip.c_str(), conn->_port);
// 1. 移除对特定 fd 的关心
// EnableEvent(connection->Sockfd(), false, false);
_epoller_ptr->EpollerCtl(EPOLL_CTL_DEL, conn->Sockfd(), 0);
// 2. 关闭异常的 fd
lg(Debug, "close %d done...\n", conn->Sockfd());
close(conn->Sockfd());
// 3. 从 _connections 中移除 fd 和 Connection 的映射关系
lg(Debug, "remove %d from _connections...\n", conn->Sockfd());
_connections.erase(conn->Sockfd());
}
void EnableEvent(int sockfd, bool readAble, bool writeAble)
{
uint32_t events = 0;
events |= ((readAble ? EPOLLIN : 0) | (writeAble ? EPOLLOUT : 0) | EPOLLET);
_epoller_ptr->EpollerCtl(EPOLL_CTL_MOD, sockfd, events);
}
bool IsConnectionExist(int fd)
{
auto iter = _connections.find(fd);
if(iter == _connections.end()){
return false;
}
else{
return true;
}
}
void Dispatcher(int timeout)
{
int n = _epoller_ptr->EpollerWait(revs, num, timeout);
for(int i = 0; i < n; i++){
uint32_t event = revs[i].events;
int sockfd = revs[i].data.fd;
// 一旦事件异常,统一把异常转换为读写问题
if(event & EPOLLERR){
event |= (EPOLLIN | EPOLLOUT);
}
if(event & EPOLLHUP){
event |= (EPOLLIN | EPOLLOUT);
}
// 只需要处理读写
if((event & EPOLLIN) && IsConnectionExist(sockfd)){
if(_connections[sockfd]->_recv_cb){
_connections[sockfd]->_recv_cb(_connections[sockfd]);
}
}
if((event & EPOLLOUT) && IsConnectionExist(sockfd)){
if(_connections[sockfd]->_send_cb){
_connections[sockfd]->_send_cb(_connections[sockfd]);
}
}
}
}
void Loop()
{
_quit = false;
// AddConnection();
while(!_quit)
{
// 事件派发
// Dispatcher(3000);
Dispatcher(-1);
PrintConnection();
}
_quit = true;
}
void PrintConnection()
{
std::cout << "_connections fd list: ";
for(auto& connection : _connections){
std::cout << connection.second->Sockfd() << ", ";
std::cout << "inbuffer: " << connection.second->Inbuffer() << " ";
}
std::cout << std::endl;
}
~TcpServer()
{}
private:
std::shared_ptr<Epoller> _epoller_ptr;
std::shared_ptr<Sock> _listenSock_ptr;
uint16_t _port;
bool _quit;
struct epoll_event revs[num];
// fd 到对应连接到映射,_connections 就是当前服务器管理的所有连接
std::unordered_map<int, std::shared_ptr<Connection>> _connections;
// 让上层处理信息
func_t _OnMessage;
};
(3)Calculator.hpp
Calculator.hpp 是上层处理业务的具体处理方法,代码如下:
#pragma once
#include <string>
#include <iostream>
#include "Protocol.hpp"
enum
{
DIV_ERR = 1,
MOD_ERR = 2,
OP_ERR = 3
};
// 上层业务
class Calculator
{
public:
Calculator()
{}
Response CalculatorHelper(const Request &req)
{
Response resp(0, 0);
switch (req._op)
{
case '+':
resp._result = req._x + req._y;
break;
case '-':
resp._result = req._x - req._y;
break;
case '*':
resp._result = req._x * req._y;
break;
case '%':
{
if (req._y == 0)
resp._code = MOD_ERR;
else
resp._result = req._x % req._y;
}
break;
case '/':
{
if (req._y == 0)
resp._code = DIV_ERR;
else
resp._result = req._x / req._y;
}
break;
default:
resp._code = OP_ERR;
break;
}
return resp;
}
// "len"\n"10 + 20"\n
std::string Handler(std::string &package)
{
std::string content;
bool ret = Decode(package, &content); // content = "10 + 20"
if (!ret)
return "";
Request req;
ret = req.Deserialize(content); // x = 10, y = 20, op = '+'
if (!ret)
return "";
content = "";
Response resp = CalculatorHelper(req); // result = 30, code = 0
resp.Serialize(&content); // content = "30 0"
content = Encode(content); // content = "len"\n"30 0\n"
return content;
}
~Calculator()
{}
};
(4)main.cpp
下面是主函数的调用:
#include <iostream>
#include <memory>
#include <functional>
#include "TcpServer.hpp" // 处理IO
#include "Calculator.hpp" // 处理业务
#include "log.hpp"
Calculator calculator;
void DefaultOnMessage(std::weak_ptr<Connection> conn)
{
if(conn.expired()) return;
auto connection_ptr = conn.lock();
std::cout << "Application layerget a message: " << connection_ptr->Inbuffer() << std::endl;
// 对报文进行处理
std::string response_str = calculator.Handler(connection_ptr->Inbuffer());
if(response_str.empty()){
return;
}
lg(Debug, "%s", response_str.c_str());
// response_str 发送出去
connection_ptr->AppendOutBuffer(response_str);
// 因为写事件(发送缓冲区是否有空间,经常是ok的),经常是就绪的
// 所以如果我们设置对 EPOLLOUT 关心,那么 EPOLLOUT 几乎每次都是就绪的
// 就导致 epollserver 经常返回,浪费 CPU 资源
// 所以,对于读取,我们设置为常关心;对于写,我们设置为按需设置
// 处理写事件:直接写入,如果写入完成,就结束。
// 如果写入完成,但是数据还没有写完,_outbuffer 里还有内容,我们就需要设置对写事件进行关心了,如果写完了,就去掉写事件的关心
// connection_ptr->_send_cb(connection_ptr);
auto tcpserver = connection_ptr->_tcpServer_ptr.lock();
tcpserver->Sender(connection_ptr);
}
int main()
{
std::shared_ptr<TcpServer> tcp_svr(new TcpServer(8888, DefaultOnMessage));
tcp_svr->Init();
tcp_svr->Loop();
return 0;
}
其中有一些头文件例如 Socket.hpp 和 log.hpp 我们以前已经用过,这里就不再放出来了。
(5)CMakeLists.txt
cmake_minimum_required(VERSION 2.8)
project(ReactorServer)
add_executable(reactorServer main.cc)
target_link_libraries(reactorServer jsoncpp)
add_executable(clientCal ClientCal.cc)
target_link_libraries(clientCal jsoncpp)
3. 总结
Reactor 其实是一个半同步半异步模型,那么 IO 等于等待+数据拷贝,所以 Reactor 的半同步半异步体现在,等待是由 epoll 完成,这是体现同步;异步体现在 Reactor 可以进行回调处理。
在 Reactor 模式中,有一个事件循环(Event Loop)负责监听和分发事件。当有新的事件到达时,事件循环会将其分发给相应的处理程序进行处理。这种方式可以实现高效的并发处理,避免了线程创建和销毁的开销。