文章目录
1.std::enable_shared_from_this
在C++中,TcpServer类继承自std::enable_shared_from_this是一个非常有用的设计模式,特别是在需要在一个类的成员函数内部安全地创建或返回指向该对象自身的std::shared_ptr时。下面我将详细解释这一设计模式的用途和如何在TcpServer类中实现它。
为什么使用std::enable_shared_from_this?
std::enable_shared_from_this是一个模板类,它允许派生类的成员函数返回指向该对象的std::shared_ptr。这对于那些需要在内部创建并返回shared_ptr的类特别有用,尤其是在这些shared_ptr需要与其他对象共享所有权时。
直接在一个类的成员函数内部创建一个std::shared_ptr(this)是不安全的,因为这会导致两个独立的shared_ptr指向同一个对象,但它们各自管理着独立的计数器,这可能导致对象被删除两次(double-delete)的风险。
如何使用std::enable_shared_from_this?
继承自std::enable_shared_from_this:首先,你的类需要继承自std::enable_shared_from_this。在你的例子中,TcpServer类继承自std::enable_shared_from_this。
使用shared_from_this():在你的类成员函数内部,你可以使用shared_from_this()成员函数来安全地获取指向当前对象的std::shared_ptr。shared_from_this()会返回一个std::shared_ptr(或你的类类型),该指针与已经存在的shared_ptr共享所有权。
示例
假设TcpServer类需要在一个成员函数内部创建一个新的连接,并返回指向该连接的std::shared_ptr,同时这个连接也需要保持对TcpServer的引用。
cpp
#include <memory>
#include <iostream>
class TcpServer : public std::enable_shared_from_this<TcpServer> {
public:
std::shared_ptr<TcpServer> getServerPtr() {
// 安全地返回指向当前对象的shared_ptr
return shared_from_this();
}
// 假设这是处理新连接的函数
void handleNewConnection() {
// 注意:在调用shared_from_this()之前,
// 必须确保至少有一个shared_ptr<TcpServer>已经管理着这个对象。
// 否则,shared_from_this()会抛出std::bad_weak_ptr异常。
// 假设我们在这里创建一个新的连接,并需要返回TcpServer的shared_ptr
std::cout << "Creating a new connection and returning a shared_ptr to TcpServer\n";
// 示例:将TcpServer的shared_ptr传递给某个需要它的对象
}
// 构造函数和其他成员函数...
};
int main() {
// 创建一个shared_ptr来管理TcpServer对象
auto tcpSvr_SharedPtr = std::make_shared<TcpServer>();
// 现在可以安全地在TcpServer的成员函数内部调用shared_from_this()
auto serverPtr = tcpSvr_SharedPtr->getServerPtr();
// 使用serverPtr...
}
注意事项
在调用shared_from_this()之前,必须确保至少有一个std::shared_ptr正在管理该对象。否则,shared_from_this()会抛出std::bad_weak_ptr异常。
std::enable_shared_from_this通常与std::make_shared一起使用,因为std::make_shared会初始化std::shared_ptr并管理对象的生命周期。
std::enable_shared_from_this不能用于裸指针(raw pointers)或std::unique_ptr,因为它依赖于std::shared_ptr的控制块来跟踪对象的所有权。
2.std::bind
C++11 引入的一个非常有用的函数模板,它用于将可调用对象(如函数、成员函数、函数对象、lambda 表达式等)与其参数绑定起来,生成一个新的可调用对象。这个新对象在被调用时,会调用原始的可调用对象,并传入绑定时指定的参数(如果有的话),以及调用时提供的任何额外参数。
下面是一些 std::bind 的经典使用样例:
- 绑定函数和它的参数
假设有一个简单的函数,我们想提前绑定它的一个或多个参数。
cpp
#include <iostream>
#include <functional>
void print(int x, double y) {
std::cout << "x: " << x << ", y: " << y << std::endl;
}
int main() {
auto boundFunc = std::bind(print, std::placeholders::_1, 3.14);
boundFunc(10); // 输出: x: 10, y: 3.14
return 0;
}
这里,std::placeholders::_1 是一个占位符,表示 boundFunc 被调用时接收的第一个参数将会替换这个占位符。
- 绑定成员函数和它的对象实例
假设我们有一个类,想要绑定它的一个成员函数,并同时绑定调用该成员函数的对象实例。
cpp
#include <iostream>
#include <functional>
class MyClass {
public:
void show(int x) {
std::cout << "x: " << x << std::endl;
}
};
int main() {
MyClass obj;
auto boundMemberFunc = std::bind(&MyClass::show, &obj, std::placeholders::_1);
boundMemberFunc(42); // 输出: x: 42
return 0;
}
注意,这里我们使用了 &obj 来指定 show 成员函数将要被调用的对象实例。
在C++中,std::bind是一个非常有用的工具,特别是当你需要绑定成员函数时。成员函数与普通的自由函数(或静态成员函数)不同,因为它们需要一个对象实例来被调用。std::bind允许你绑定这个对象实例,以及成员函数的任何额外参数,生成一个新的可调用对象。
class Sub {
public:
static int sub(int a, int b) {
return a - b;
}
};
std::function<int(int, int)> funcSub = std::bind(&Sub::sub, std::placeholders::_1, std::placeholders::_2);
// 或者更简单地,因为静态成员函数不需要对象实例,你可以直接这样做:
std::function<int(int, int)> funcSub = Sub::sub;
基本概念
成员函数是类的成员,它们需要访问类的非静态成员(包括数据成员和其他成员函数)。因此,调用成员函数时,需要指定一个对象实例(对于非静态成员函数),除非该函数是静态的。
使用std::bind绑定成员函数
当你想要使用std::bind来绑定一个成员函数时,你需要做两件事:
- 指定成员函数所属的对象实例(对于非静态成员函数)。这通常是通过传递对象的指针或引用来完成的。
- 指定要绑定的成员函数。使用成员函数指针的语法(&ClassName::MemberFunctionName)。
示例
假设我们有一个简单的类,它有一个成员函数,我们想要使用std::bind来绑定这个成员函数。
cpp
#include <iostream>
#include <functional>
class MyClass {
public:
void greet(const std::string& name) const {
std::cout << "Hello, " << name << "!" << std::endl;
}
};
int main() {
MyClass obj;
// 绑定成员函数和对象实例
auto boundGreet = std::bind(&MyClass::greet, &obj, std::placeholders::_1);
// 调用boundGreet,传入要打印的名字
boundGreet("Alice"); // 输出: Hello, Alice!
return 0;
}
在这个例子中,&MyClass::greet是指向MyClass类中greet成员函数的指针。&obj是MyClass类的一个对象实例的地址,它将被用作greet成员函数调用的上下文(即this指针)。std::placeholders::_1是一个占位符,它表示在调用boundGreet时提供的第一个参数将用作greet函数的name参数。
注意事项
对象的生命周期:当你使用std::bind绑定一个成员函数和对象实例时,确保在调用生成的可调用对象之前,该对象实例是有效的。如果对象在绑定之后但在调用之前被销毁,那么调用将会是不确定的,可能导致程序崩溃。
const成员函数:如果成员函数是const的(如上例中的greet),则可以使用const对象实例或非const对象实例来绑定它。但是,如果成员函数不是const的,则必须使用非const对象实例来绑定它。
拷贝和移动:通过std::bind生成的可调用对象可以像其他对象一样被拷贝和移动。但是,请注意,这不会影响绑定的对象实例或成员函数。它只是拷贝或移动了绑定状态本身。
lambda表达式:在许多情况下,lambda表达式提供了一种更简洁、更直观的方式来定义和绑定成员函数(以及其他类型的可调用对象)。因此,虽然std::bind仍然很有用,但lambda表达式在C++11及更高版本中更为流行。
- 绑定多个参数
可以绑定多个参数,并在调用时提供剩余的参数。
cpp
#include <iostream>
#include <functional>
void add(int x, int y, int z) {
std::cout << "Sum: " << (x + y + z) << std::endl;
}
int main() {
auto boundAdd = std::bind(add, std::placeholders::_1, 10, std::placeholders::_2);
boundAdd(5, 20); // 输出: Sum: 35
return 0;
}
在这个例子中,boundAdd 接收两个参数,第一个和第三个参数在调用时提供,第二个参数在绑定时就被固定为 10。
注意事项
std::bind 生成的可调用对象可以像函数一样被调用,也可以被赋值给函数指针(如果返回类型和参数列表兼容的话,但通常不这么做,因为 std::bind 返回的是一个特殊的函数对象类型)。
在 C++11 及之后的版本中,lambda 表达式提供了更为灵活和强大的方式来定义和绑定函数,因此 std::bind 的使用频率有所下降。不过,在某些特定场景下,std::bind 仍然有其用武之地。
3.std::make_shared
C++11及以后版本中引入的一个非常有用的函数模板,它位于头文件中。std::make_shared的主要目的是以一种类型安全且高效的方式创建并返回一个指向动态分配对象的std::shared_ptr智能指针。
为什么使用 std::make_shared?
性能优化:使用std::make_shared可以只进行一次内存分配,同时分配控制块(用于存储shared_ptr的引用计数和可能的删除器)和对象本身的空间。相比之下,如果你首先使用new操作符创建对象,然后将其传递给std::shared_ptr的构造函数,那么至少需要两次内存分配:一次为对象本身,另一次为控制块。
类型安全:std::make_shared通过模板参数直接推导对象的类型,避免了显式类型转换的需要,从而提高了代码的安全性和可读性。
简化代码:使用std::make_shared可以使得代码更加简洁,特别是当你需要创建一个std::shared_ptr指向的对象并传递多个参数给其构造函数时。
如何使用 std::make_shared?
基本语法如下:
cpp
#include <memory>
std::shared_ptr<T> ptr = std::make_shared<T>(args...);
其中T是你想要创建的对象的类型,args…是传递给T的构造函数的参数(如果有的话)。
示例
假设我们有一个简单的类MyClass,它有一个接受整数的构造函数:
cpp
#include <iostream>
#include <memory>
class MyClass {
public:
MyClass(int value) : value_(value) {}
void printValue() const {
std::cout << "Value: " << value_ << std::endl;
}
private:
int value_;
};
int main() {
// 使用 std::make_shared 创建一个 MyClass 对象的 std::shared_ptr
std::shared_ptr<MyClass> ptr = std::make_shared<MyClass>(42);
// 调用 MyClass 对象的成员函数
ptr->printValue(); // 输出: Value: 42
return 0;
}
在这个例子中,std::make_shared(42)创建了一个MyClass类型的对象,其构造函数被初始化为42,并返回一个指向该对象的std::shared_ptr。这个智能指针随后被用来访问对象的成员函数printValue。
4.std::shared_ptr
来管理动态分配的内存(例如通过new操作符分配的对象)是一种常见的做法,特别是在涉及到资源管理和生命周期控制的场景中。在你的例子中,使用std::shared_ptr来管理TcpServer对象的实例有几个主要的好处:
- 自动内存管理:std::shared_ptr通过自动管理其所指向对象的生命周期,减少了内存泄漏的风险。当最后一个std::shared_ptr指向的对象被销毁时(例如,当shared_ptr超出作用域或被显式重置时),它所指向的对象也会被自动删除(调用delete),从而释放了分配的内存。这对于管理长时间运行的服务器程序尤其重要,因为内存泄漏可能在长时间运行后累积成大问题。
- 异常安全:在C++中,异常处理是编程的一个重要方面。如果在使用裸指针管理资源时发生异常,并且没有适当的异常处理机制来确保资源被正确释放,那么可能会导致资源泄漏。使用std::shared_ptr,即使发生异常,只要shared_ptr的析构函数被调用(例如在栈展开的过程中),它所管理的资源也会被自动释放。
- 共享所有权:std::shared_ptr允许多个shared_ptr实例共享对同一对象的所有权。这在你需要将对象传递给多个函数或组件,并且希望这些函数或组件都能管理对象的生命周期时非常有用。然而,在你的例子中,你似乎只创建了一个shared_ptr来管理TcpServer,但这并不妨碍std::shared_ptr提供的自动内存管理和异常安全的好处。
- 灵活性:std::shared_ptr是C++标准库的一部分,因此它提供了高度的灵活性和可移植性。你可以在任何支持C++11或更高版本的编译器上使用它,而不需要担心特定平台或库的依赖问题。
与其他智能指针的互操作性:std::shared_ptr可以与标准库中的其他智能指针(如std::weak_ptr)配合使用,以提供更细粒度的控制权和避免循环引用等问题。
综上所述,使用std::shared_ptr来管理TcpServer对象的实例是一个很好的实践,它可以帮助你编写更安全、更健売、更易于维护的C++代码。
std::shared_ptr 和 std::weak_ptr配合使用
在C++中,std::shared_ptr 和 std::weak_ptr 经常一起使用来管理共享对象的生命周期,同时避免循环引用等问题。std::shared_ptr 提供了对对象的共享所有权,当最后一个 shared_ptr 被销毁或重置时,它所指向的对象也会被自动删除。而 std::weak_ptr 提供了一种对 shared_ptr 所管理对象的非拥有性观察(观察但不拥有)。它不会增加对象的所有权计数,因此不会阻止对象的销毁。
以下是一个 std::shared_ptr 和 std::weak_ptr 配合使用的简单样例:
cpp
#include <iostream>
#include <memory>
class MyClass {
public:
MyClass(int value) : value_(value) {}
void print() const { std::cout << "Value: " << value_ << std::endl; }
~MyClass() { std::cout << "MyClass destroyed." << std::endl; }
private:
int value_;
};
int main() {
// 创建一个 shared_ptr
std::shared_ptr<MyClass> ptr = std::make_shared<MyClass>(10);
// 创建一个 weak_ptr 指向相同的对象
std::weak_ptr<MyClass> weakPtr = ptr;
// 使用 weak_ptr 之前,先将其转换为 shared_ptr
if (auto lockedPtr = weakPtr.lock()) {
lockedPtr->print(); // 安全地使用对象
} else {
std::cout << "The object has been destroyed." << std::endl;
}
// 当 ptr 被销毁时,如果没有其他 shared_ptr 指向 MyClass 实例,
// MyClass 实例也会被销毁。
// 但 weakPtr 仍然可以存在,只是它不再指向任何有效的对象。
ptr.reset(); // 手动重置 ptr
if (auto lockedPtr = weakPtr.lock()) {
lockedPtr->print();
} else {
std::cout << "The object has been destroyed." << std::endl;
}
return 0;
}
在这个例子中,我们首先创建了一个 std::shared_ptr 类型的 ptr,它指向一个 MyClass 的实例。然后,我们创建了一个 std::weak_ptr 类型的 weakPtr,它指向与 ptr 相同的对象。我们使用 weakPtr.lock() 尝试将 weakPtr 转换为 shared_ptr,这样我们就可以安全地访问对象(如果它仍然存在)。当 ptr 被销毁时,如果这是最后一个指向对象的 shared_ptr,则对象也会被销毁。此时,尝试通过 weakPtr 访问对象将失败,因为 weakPtr.lock() 将返回一个空的 shared_ptr。
这种机制特别有用,例如,在缓存、事件监听或任何需要避免循环引用的场景中。
本文代码含有大量实参是共享指针 形参以弱指针接收 在函数内部转化为共享指针使用的例子
5.剖析代码
创建服务器实例:调用epoll_create()等
服务器初始化 + 添加 监听fd及其所关心事件
服务器调用loop 进入循环
等待事件就绪 并 派发事件给指定处理函数;fd有效且当前fd关心的事件有对应的回调函数 执行回调函数;某一fd其关心事件指定的回调在这里设置:
一旦有客户端发起连接 连接事件就绪 监听套接字fd 关心的event_in 就绪,它对应的accepter就会被回调!连接事件就绪后,此时就需要将服务套接字和其关心的事件及其回调设置进去。
之后,该fd上有读事件就绪就调用recver 写就sender 异常就excepter
收发异常具体看代码
6.整体代码
之前编写的 epoll - echo服务器的代码中,在其他普通的 fd 读取事件就绪时,也就是在 Recver() 中,读取是有问题的,因为我们不能区分每次读取上来的数据是一个完整的报文。另外还有其它各种问题,所以我们要对上面的代码使用 Reactor 的设计模式作修改。
Reactor 是一种设计模式,称为反应堆模式。用于处理事件驱动的系统中的并发操作。提供了一种结构化的方式来处理输入事件,并将其分发给相应的处理程序。Reactor 模式通常用于网络编程中,特别是在服务器端应用程序中。
要进行正确的 IO 处理,就应该有如下的理解:在应用层一定存在大量的连接,每一个连接在应用层都叫做文件描述符。而在读取每一个文件描述符上的数据的时候,可能根本就没有读取完,此时我们就需要把该文件描述符上的数据临时保存起来。所以我们在写服务器的时候,我们要保证每一个文件描述符及其连接及其缓冲区,都是独立的!
Reactor 其实是一个半同步半异步模型,IO 等于等待+数据拷贝,Reactor 的半同步半异步体现在,等待是由 epoll 完成,体现同步;异步体现在 Reactor 可以进行回调处理。
在 Reactor 模式中,有一个事件循环(Event Loop)负责监听和分发事件。当有新的事件到达时,事件循环会将其分发给相应的处理程序进行处理。这种方式可以实现高效的并发处理,避免了线程创建和销毁的开销。
对读写事件的关心区别
- epoll/select/poll, 因为写事件(发送缓冲区是否有空间,经常是OK的), 经常就是就绪的
- 如果我们设置对EPOLLOUT关心,EPOLLOUT几乎每次都有就绪
- 导致epollserver经常返回,浪费CPU的资源
- 结论:对于读,设置常关心。对于写,按需设置 什么是按需设置?
- 怎么处理写呢?直接写入,如果写入完成,就结束,如果写入完成,但是数据没有写完,outbufer里还有内容,我们就需要设置对写事件进行关心了。如果写完了,去掉对写事件的关心!
按需设置对写事件的关心
Calculator.hpp
#pragma once
#include <iostream>
#include "Protocol.hpp"
enum
{
Div_Zero = 1,
Mod_Zero,
Other_Oper
};
// 上层业务:将客户端发来的数据 解码+反序列化 计算 序列化+编码
class Calculator
{
public:
Calculator()
{
}
Response CalculatorHelper(const Request &req)
{
Response resp(0, 0);
switch (req.op)
{
case '+':
resp._result = req.x + req.y;
break;
case '-':
resp._result = req.x - req.y;
break;
case '*':
resp._result = req.x * req.y;
break;
case '/':
{
if (req.y == 0)
resp._code = Div_Zero;
else
resp._result = req.x / req.y;
}
break;
case '%':
{
if (req.y == 0)
resp._code = Mod_Zero;
else
resp._result = req.x % req.y;
}
break;
default:
resp._code = Other_Oper;
break;
}
return resp;
}
// "len"\n"10 + 20"\n
std::string Handler(std::string &stringMsg)
{
std::string tmpBuffer;
bool r = Decode(stringMsg, &tmpBuffer); // "len"\n"10 + 20"\n
if (!r)
return "";
Request req;
r = req.Deserialize(tmpBuffer); // "10 + 20" ->x=10 op=+ y=20
if (!r)
return "";
tmpBuffer = "";
Response resp = CalculatorHelper(req); // result=30 code=0;
resp.Serialize(&tmpBuffer); // "30 0"
tmpBuffer = Encode(tmpBuffer); // "len"\n"30 0"
return tmpBuffer;
}
~Calculator()
{
}
};
ClientCal.cc
#include <iostream>
#include <string>
#include <cstring>
#include <ctime>
#include <cassert>
#include <unistd.h>
#include "Socket.hpp"
#include "Protocol.hpp"
//客户端:生成随机数据 序列化+编码 发送; 解码+反序列化 读取结果
static void Usage(const std::string &proc)
{
std::cout << "\nUsage: " << proc << " serverip serverport\n"
<< std::endl;
}
// ./clientcal ip port
int main(int argc, char *argv[])
{
if (argc != 3)
{
Usage(argv[0]);
exit(0);
}
std::string serverip = argv[1];
uint16_t serverport = std::stoi(argv[2]);
Sock sockfd;
sockfd.Socket();
bool connectFlag = sockfd.Connect(serverip, serverport);
if (!connectFlag)
return 1;
srand(time(nullptr) ^ getpid());
int cnt = 1;
const std::string opers = "+-*/%=-=&^";
std::string inbuffer_stream;
while (cnt <= 10)
{
std::cout << "===============第" << cnt << "次测试....., " << "===============" << std::endl;
int x = rand() % 100 + 1;
usleep(1234);
int y = rand() % 100;
usleep(4321);
char oper = opers[rand() % opers.size()];
Request req(x, y, oper);
req.DebugPrint();
std::string Serialized_StringMsg;
req.Serialize(&Serialized_StringMsg);
Serialized_StringMsg = Encode(Serialized_StringMsg);
write(sockfd.getSocketFd(), Serialized_StringMsg.c_str(), Serialized_StringMsg.size());
char buffer[128];
ssize_t n = read(sockfd.getSocketFd(), buffer, sizeof(buffer)); // 无法保证能读到一个完整的报文
if (n > 0)
{
buffer[n] = 0;
inbuffer_stream += buffer; // "len"\n"result code"\n
std::cout << inbuffer_stream << std::endl;
std::string Decoded_StringMsg;
bool r = Decode(inbuffer_stream, &Decoded_StringMsg); // "result code"
assert(r);
Response resp;
r = resp.Deserialize(Decoded_StringMsg);
assert(r);
resp.DebugPrint();
}
std::cout << "=================================================" << std::endl;
sleep(1);
cnt++;
}
sockfd.CloseFd();
return 0;
}
CMakeLists.txt
cmake_minimum_required(VERSION 3.29)
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED True)
project(Reactor)
add_executable(reactor_server Main.cc)
target_link_libraries(reactor_server jsoncpp)
add_executable(client ClientCal.cc)
target_link_libraries(client jsoncpp)
Common.hpp
#pragma once
#include <unistd.h>
#include <stdlib.h>
#include <fcntl.h>
#define NON_BLOCK_ERR 5
void SetNonBlockOrDie(int sock)
{
int curFlag = fcntl(sock, F_GETFL);
if (curFlag < 0)
exit(NON_BLOCK_ERR);
fcntl(sock, F_SETFL, curFlag | O_NONBLOCK);
}
Epoller.hpp
#pragma once
#include "nocopy.hpp"
#include "Log.hpp"
#include <cerrno>
#include <cstring>
#include <sys/epoll.h>
class Epoller : public nocopy
{
static const int size = 128;
private:
int _epollFd;
int _timeout{3000};
public:
Epoller()
{
_epollFd = epoll_create(size);
if (_epollFd == -1)
lg(Error, "epoll_create error: %s", strerror(errno));
else
lg(Info, "epoll_create success: %d", _epollFd);
}
int EpollerWait(struct epoll_event revents[], int num)
{
// int epoll_wait(int __epfd, epoll_event *__events, int __maxevents, int __timeout)
int n = epoll_wait(_epollFd, revents, num, /*_timeout 0*/ -1);
return n;
}
int EpllerUpdate(int oper, int sock, uint32_t event)
{
// int epoll_ctl(int __epfd, int __op, int __fd, epoll_event *)
int n = 0;
if (oper == EPOLL_CTL_DEL)
{
n = epoll_ctl(_epollFd, oper, sock, nullptr);
if (n != 0)
lg(Error, "epoll_ctl delete error!");
}
else // EPOLL_CTL_MOD || EPOLL_CTL_ADD
{
struct epoll_event ev;
ev.events = event;
ev.data.fd = sock; // 方便后期得知 是哪一个fd就绪了
n = epoll_ctl(_epollFd, oper, sock, &ev);
if (n != 0)
lg(Error, "epoll_ctl error!");
}
return n;
}
~Epoller()
{
if (_epollFd >= 0)
close(_epollFd);
}
};
Log.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <time.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#define SIZE 1024
#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4
#define Screen 1
#define Onefile 2
#define Classfile 3
#define LogFile "log.txt"
using namespace std;
class Log
{
private:
int printMethod;
std::string path;
public:
Log()
{
printMethod = Screen;
path = "./";
}
void Enable(int method)
{
printMethod = method;
}
std::string levelToString(int level)
{
switch (level)
{
case Info:
return "Info";
case Debug:
return "Debug";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "None";
}
}
/*
void logmessage(int level, const char *format, ...)
{
time_t t = time(nullptr);
struct tm *ctime = localtime(&t);
char leftbuffer[SIZE];
snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
ctime->tm_hour, ctime->tm_min, ctime->tm_sec);
va_list s;
va_start(s, format);
char rightbuffer[SIZE];
vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
va_end(s);
// 格式:默认部分+自定义部分
char logtxt[SIZE * 2];
snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);
// printf("%s", logtxt);
printLog(level, logtxt);
}
*/
// lg(Warning, "accept error, %s: %d", strerror(errno), errno);
void operator()(int level, const char *msg_format, ...)
{
time_t timestamp = time(nullptr);
struct tm *ctime = localtime(×tamp);
//level 年月日
char leftbuffer[SIZE];
snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
ctime->tm_hour, ctime->tm_min, ctime->tm_sec);
//自定义msg
va_list arg_list;//存储可变参数列表信息
va_start(arg_list, msg_format);//初始化 使其指向函数参数列表中format参数之后的第一个可变参数
char rightbuffer[SIZE];
vsnprintf(rightbuffer, sizeof(rightbuffer), msg_format, arg_list);
va_end(arg_list);//清理va_list变量
// 格式:默认部分+自定义部分
char log_content[SIZE * 2];
snprintf(log_content, sizeof(log_content), "%s %s", leftbuffer, rightbuffer);
// printf("%s", logtxt); // 暂时打印
printLog(level, log_content);
}
void printLog(int level, const std::string &log_content)
{
switch (printMethod)
{
case Screen:
std::cout << log_content << std::endl;
break;
case Onefile:
printOneFile(LogFile, log_content);
break;
case Classfile:
printClassFile(level, log_content);
break;
default:
break;
}
}
void printOneFile(const std::string &log_filename, const std::string &log_content)
{
//path = "./"; #define LogFile "log.txt"
std::string _logFilename = path + log_filename;
int fd = open(_logFilename.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666); // "log.txt"
if (fd < 0)
return;
write(fd, log_content.c_str(), log_content.size());
close(fd);
}
void printClassFile(int level, const std::string &log_content)
{
//#define LogFile "log.txt"
std::string filename = LogFile;
filename += ".";
filename += levelToString(level); // "log.txt.Debug"
printOneFile(filename, log_content);
}
~Log()
{
}
};
Log lg;
/*
int sum(int n, ...)
{
va_list s; // char*
va_start(s, n);
int sum = 0;
while(n)
{
sum += va_arg(s, int); // printf("hello %d, hello %s, hello %c, hello %d,", 1, "hello", 'c', 123);
n--;
}
va_end(s); //s = NULL
return sum;
}
*/
Main.cc
#include <iostream>
#include <functional>
#include <memory>
#include "Log.hpp"
#include "TcpServer.hpp" // 处理IO的
#include "Calculator.hpp" // 处理业务的
// 我们的业务逻辑比较简单 没有特别耗时的操作 如果有 收到的数据不一定是完整的数据
Calculator calculator;
// weak_ptr:协助shared_ptr工作 只可以从一个shared_ptr或另一个weak_ptr对象构造
// 它的构造和析构不会引起引用记数的增加或减少
void DefaultOnMessage(std::weak_ptr<TcpConnection> connectionWeakPtr)
{
// 判断当前weak_ptr智能指针是否 过期/还有托管的对象
// expired() == true 即use_count() == 0 即已经没有托管的对象了 即过期了
// 可能还有析构函数进行释放内存 但此对象的析构已经临近或可能已发生
if (connectionWeakPtr.expired())
return;
// 转为shared_ptr
auto connectionSharedPtr = connectionWeakPtr.lock();
// 将数据从接收缓冲区中取出来
// 1. echo 客户端发来的数据 2. 将数据交给业务层处理
std::cout << "client send: " << connectionSharedPtr->getInbuffer() << std::endl;
std::string response_str = calculator.Handler(connectionSharedPtr->getInbuffer());
if (response_str.empty())
return;
lg(Debug, "response_str: %s", response_str.c_str());
// 将处理好的数据拷贝到发送缓冲区
connectionSharedPtr->AppendOutBuffer(response_str);
auto tcpSvr_SharedPtr = connectionSharedPtr->_tcpSvr_WeakPtr.lock();
tcpSvr_SharedPtr->Sender(connectionSharedPtr);
}
int main()
{
std::shared_ptr<TcpServer> epollTcpSvr_SharedPtr(new TcpServer(8888, DefaultOnMessage));
epollTcpSvr_SharedPtr->Init();
epollTcpSvr_SharedPtr->Loop();
return 0;
}
nocopy.hpp
#pragma once
class nocopy
{
public:
nocopy() {}
nocopy(const nocopy &) = delete;
const nocopy &operator=(const nocopy &) = delete;
};
Protocol.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <jsoncpp/json/json.h>
//协议:订制协议 分别提供请求和响应的序列反序列化 并提供编码解码接口
// #define MySelf 1
const std::string blank_space_sep = " ";
const std::string protocol_sep = "\n";
//编码:"XXXXXX"len"\n"x op y"\nXXXXXX
std::string Encode(std::string &stringMsg)
{
std::string encoded_strMsg = std::to_string(stringMsg.size());
encoded_strMsg += protocol_sep;
encoded_strMsg += stringMsg;
encoded_strMsg += protocol_sep;
return encoded_strMsg;
}
// 解码:"XXXXXX"len"\n"x op y"\nXXXXXX
bool Decode(std::string &encoded_strMsg, std::string *stringMsg)
{
std::size_t pos = encoded_strMsg.find(protocol_sep);
if (pos == std::string::npos)
return false;
std::string len_str = encoded_strMsg.substr(0, pos);
std::size_t len = std::stoi(len_str);
std::size_t total_len = len_str.size() + len + 2;
if (encoded_strMsg.size() < total_len)
return false;
*stringMsg = encoded_strMsg.substr(pos + 1, len);
encoded_strMsg.erase(0, total_len);
return true;
}
// 请求类 struct {x op y} <==> string
class Request
{
public:
// x op y
int x;
int y;
char op; // + - * / %
public:
Request(int data1, int data2, char oper)
: x(data1),
y(data2),
op(oper)
{
}
Request()
{
}
public:
// 序列化:数据结构-->字节序列
bool Serialize(std::string *structed_Data)
{
#ifdef MySelf
// 构建报文的有效载荷 struct => string, "x op y"
std::string s = std::to_string(x);
s += blank_space_sep;
s += op;
s += blank_space_sep;
s += std::to_string(y);
*structed_Data = s;
return true;
#else
Json::Value root;
root["x"] = x;
root["y"] = y;
root["op"] = op;
// Json::FastWriter w;
Json::StyledWriter w;
*structed_Data = w.write(root);
return true;
#endif
}
// 反序列化:字节序列-->数据结构
bool Deserialize(const std::string &stringMsg) // "x op y"
{
#ifdef MySelf
std::size_t left = stringMsg.find(blank_space_sep);
if (left == std::string::npos)
return false;
std::string part_x = stringMsg.substr(0, left);
std::size_t right = stringMsg.rfind(blank_space_sep);
if (right == std::string::npos)
return false;
std::string part_y = stringMsg.substr(right + 1);
if (left + 2 != right)
return false;
op = stringMsg[left + 1];
x = std::stoi(part_x);
y = std::stoi(part_y);
return true;
#else
Json::Value root;
Json::Reader r;
r.parse(stringMsg, root);
x = root["x"].asInt();
y = root["y"].asInt();
op = root["op"].asInt();
return true;
#endif
}
// 测试打印
void DebugPrint()
{
std::cout << "新请求构建完成: " << x << op << y << "=?" << std::endl;
}
};
// 响应类:result code <==> "result code"
class Response
{
public:
int _result;
int _code; // 0正确; 非0表明对应的错误原因
public:
Response(int result, int code)
: _result(result),
_code(code)
{
}
Response()
{
}
public:
//result code --> "result code"
bool Serialize(std::string *structedData)
{
#ifdef MySelf
std::string s = std::to_string(result);
s += blank_space_sep;
s += std::to_string(code);
*structedData = s;
return true;
#else
Json::Value root;
root["result"] = _result;
root["code"] = _code;
// Json::FastWriter w;
Json::StyledWriter w;
*structedData = w.write(root);
return true;
#endif
}
// "result code"-->result code
bool Deserialize(const std::string &stringMsg)
{
#ifdef MySelf
std::size_t pos = stringMsg.find(blank_space_sep);
if (pos == std::string::npos)
return false;
std::string part_left = stringMsg.substr(0, pos);
std::string part_right = stringMsg.substr(pos + 1);
result = std::stoi(part_left);
code = std::stoi(part_right);
return true;
#else
Json::Value root;
Json::Reader r;
r.parse(stringMsg, root);
_result = root["result"].asInt();
_code = root["code"].asInt();
return true;
#endif
}
void DebugPrint()
{
std::cout << "结果响应完成, result: " << _result << ", code: " << _code << std::endl;
}
};
Socket.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <cerrno>
#include <cstring>
#include "Log.hpp"
enum
{
SocketErr = 2,
BindErr,
ListenErr
};
const int g_backlog = 10;
class Sock
{
private:
int _sockfd;
public:
Sock()
{
}
~Sock()
{
}
public:
void Socket()
{
_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
lg(Fatal, "socker error, %s: %d", strerror(errno), errno);
exit(SocketErr);
}
}
void Bind(uint16_t port)
{
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
if (bind(_sockfd, (struct sockaddr *)&local, sizeof(local)) < 0)
{
lg(Fatal, "bind error, %s: %d", strerror(errno), errno);
exit(BindErr);
}
}
void Listen()
{
if (listen(_sockfd, g_backlog) < 0)
{
lg(Fatal, "listen error, %s: %d", strerror(errno), errno);
exit(ListenErr);
}
}
int Accept(std::string *clientip, uint16_t *clientport)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int newfd = accept(_sockfd, (struct sockaddr *)&peer, &len);
if (newfd < 0)
{
lg(Warning, "accept error, %s: %d", strerror(errno), errno);
return -1;
}
char ipstr[64];
inet_ntop(AF_INET, &peer.sin_addr, ipstr, sizeof(ipstr));
*clientip = ipstr;
*clientport = ntohs(peer.sin_port);
return newfd;
}
bool Connect(const std::string &ip, const uint16_t &port)
{
struct sockaddr_in peer;
memset(&peer, 0, sizeof(peer));
peer.sin_family = AF_INET;
peer.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &(peer.sin_addr));
int n = connect(_sockfd, (struct sockaddr *)&peer, sizeof(peer));
if (n == -1)
{
std::cerr << "connect to " << ip << ":" << port << " error" << std::endl;
return false;
}
return true;
}
void CloseFd()
{
close(_sockfd);
}
int getSocketFd()
{
return _sockfd;
}
};
TcpServer.hpp
#pragma once
#include <iostream>
#include <string>
#include <memory>
#include <cerrno>
#include <functional>
#include <unordered_map>
#include "Log.hpp"
#include "nocopy.hpp"
#include "Epoller.hpp"
#include "Socket.hpp"
#include "Common.hpp"
// tcp服务器会接受很多连接 上面我们提到完整报文的问题 要想解决这个问题 需要把之前的数据保留
// 一个链接对应一个文件描述符 需要给每个fd创建配对的发送/接收缓冲区
class TcpConnection;
class TcpServer;
uint32_t ET_EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t ET_EVENT_OUT = (EPOLLOUT | EPOLLET);
const static int g_buffer_size = 128;
using func_t = std::function<void(std::weak_ptr<TcpConnection>)>;
using except_func = std::function<void(std::weak_ptr<TcpConnection>)>;
// Tcp连接类 一个连接有: fd 收发缓冲区 对端ip+port 网络操作回调函数
class TcpConnection
{
private:
int _sockFd;
std::string _inbuffer; // string无法处理二进制流 vector可以
std::string _outbuffer;
public:
func_t _recv_cb;
func_t _send_cb;
except_func _except_cb;
// 添加一个回指指针:tcp连接通过该指针 调用tcp服务器接口
// std::shared_ptr<TcpServer> _tcp_server_ptr;
std::weak_ptr<TcpServer> _tcpSvr_WeakPtr;
std::string _clientIp;
uint16_t _clientPort;
public:
TcpConnection(int sockFd)
: _sockFd(sockFd)
{
}
void SetHandler(func_t recv_cb, func_t send_cb, except_func except_cb)
{
_recv_cb = recv_cb;
_send_cb = send_cb;
_except_cb = except_cb;
}
int getSockFd()
{
return _sockFd;
}
std::string &getInbuffer()
{
return _inbuffer;
}
std::string &getOutBuffer()
{
return _outbuffer;
}
void AppendInBuffer(const std::string &info)
{
_inbuffer += info;
}
void AppendOutBuffer(const std::string &info)
{
_outbuffer += info;
}
void SetWeakPtr(std::weak_ptr<TcpServer> tcp_server_ptr)
{
_tcpSvr_WeakPtr = tcp_server_ptr;
}
~TcpConnection()
{
}
};
class TcpServer : public std::enable_shared_from_this<TcpServer>, public nocopy
{
static const int num = 64;
private:
std::shared_ptr<Epoller> _epoller_ptr; // 内核
std::shared_ptr<Sock> _listensock_ptr; // 监听socket 可以把他移除到外部
std::unordered_map<int, std::shared_ptr<TcpConnection>> _tcpConnections;
struct epoll_event revs[num];
uint16_t _tcpPort;
bool _quit;
func_t _MsgHandler; // 让上层处理信息
public:
TcpServer(uint16_t port, func_t MsgHandler)
: _tcpPort(port),
_MsgHandler(MsgHandler),
_quit(true),
_epoller_ptr(new Epoller()),
_listensock_ptr(new Sock())
{
}
// socket bind listen
void Init()
{
// 1.建立连接 绑定端口号 处于监听状态
_listensock_ptr->Socket();
int fd = _listensock_ptr->getSocketFd();
SetNonBlockOrDie(fd);
_listensock_ptr->Bind(_tcpPort);
_listensock_ptr->Listen();
lg(Info, "create listen socket success: %d", fd);
// 2.将监听套接字设置进epoll模型 监听套接字只用处理read事件
// void Accepter(std::weak_ptr<Connection> connectionWeakPtr)
// bind --> func_t recv_cb 调用:_connections[sock]->_recv_cb(_connections[sock]);
// _recv_cb(_connections[sock]); -> AccepterBind(_connections[sock])->Accepter(_connections[sock])
// 目的:将类内成员函数转换为function对象传参 因为TcpServer::Accepter单独传不过去
// 不存在"void (weak_ptr<Connection> conn)"-->"function<void (weak_ptr<Connection>)>" 适当构造函数
// AddConnection(fd, ET_EVENT_IN, TcpServer::Accepter, nullptr, nullptr);
auto AccepterBind = std::bind(&TcpServer::Accepter, this, std::placeholders::_1);
AddConnection(fd, ET_EVENT_IN, AccepterBind, nullptr, nullptr);
}
void Loop()
{
_quit = false;
while (!_quit)
{
Dispatcher(-1); // Dispatcher(3000);
PrintConnection();
}
_quit = true;
}
void PrintConnection()
{
std::cout << "_connections fd list: ";
for (auto &connection : _tcpConnections)
{
std::cout << connection.second->getSockFd() << ", ";
std::cout << "inbuffer: " << connection.second->getInbuffer().c_str();
}
std::cout << std::endl;
}
// using func_t = std::function<void (std::weak_ptr<Connection>)>
void AddConnection(int sockFd, uint32_t event, func_t recv_cb, func_t send_cb, except_func except_cb,
const std::string &ip = "0.0.0.0", uint16_t port = 0)
{
// 1. 将sockFd添加到TcpConnection中
// TcpConnection(int sock): _sockFd(sock){}
std::shared_ptr<TcpConnection> new_connection(new TcpConnection(sockFd));
// shared_ptr<TcpServer> enable_shared_from_this<TcpServer>::shared_from_this()返回当前对象的shared_ptr
// void SetWeakPtr(std::weak_ptr<TcpServer> tcp_server_ptr)
new_connection->SetWeakPtr(shared_from_this());
new_connection->SetHandler(recv_cb, send_cb, except_cb);
new_connection->_clientIp = ip;
new_connection->_clientPort = port;
// 2. 添加到unordered_map: 连接map 保存{fd : tcpConnection}
_tcpConnections.insert(std::make_pair(sockFd, new_connection));
// 3. 添加fd及其对应的事件到内核中
_epoller_ptr->EpllerUpdate(EPOLL_CTL_ADD, sockFd, event);
lg(Debug, "add a new connection success, sockfd is : %d", sockFd);
}
// 链接管理器
void Accepter(std::weak_ptr<TcpConnection> connectionWeakPtr)
{
auto connectionSharedPtr = connectionWeakPtr.lock();
while (true)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
//::表示直接调用系统接口
int serverSockFd = ::accept(connectionSharedPtr->getSockFd(), (struct sockaddr *)&peer, &len);
if (serverSockFd > 0)
{
uint16_t peerport = ntohs(peer.sin_port);
char ipbuf[128];
inet_ntop(AF_INET, &peer.sin_addr.s_addr, ipbuf, sizeof(ipbuf));
lg(Debug, "get a new client, get info-> [%s:%d], sockfd : %d", ipbuf, peerport, serverSockFd);
// 监听套接字只需要设置_recv_cb 其他如此处的服务套接字读写异常都要设置
SetNonBlockOrDie(serverSockFd);
AddConnection(serverSockFd, ET_EVENT_IN,
std::bind(&TcpServer::Recver, this, std::placeholders::_1),
std::bind(&TcpServer::Sender, this, std::placeholders::_1),
std::bind(&TcpServer::Excepter, this, std::placeholders::_1),
ipbuf, peerport);
}
else
{
// 操作当前不能立即完成 因为资源暂时不可用 非阻塞套接字上没有数据可读或可写
if (errno == EWOULDBLOCK)
break;
// 系统调用被信号中断 系统调用可以在中断之后重新尝试
else if (errno == EINTR)
continue;
else
break;
}
}
}
// 事件管理器 不用关心数据的格式 服务器只要IO数据就可以 数据完整性/报文格式等由协议管理
void Recver(std::weak_ptr<TcpConnection> connectionWeakPtr)
{
if (connectionWeakPtr.expired())
return;
auto connectionSharedPtr = connectionWeakPtr.lock();
int sockFd = connectionSharedPtr->getSockFd();
while (true)
{
char buffer[g_buffer_size];
memset(buffer, 0, sizeof(buffer));
ssize_t n = recv(sockFd, buffer, sizeof(buffer) - 1, 0); // 非阻塞读取
if (n > 0)
connectionSharedPtr->AppendInBuffer(buffer);
else if (n == 0)
{
lg(Info, "sockfd: %d, client info [%s:%d] quit...", sockFd, connectionSharedPtr->_clientIp.c_str(), connectionSharedPtr->_clientPort);
connectionSharedPtr->_except_cb(connectionSharedPtr);
return;
}
else
{
if (errno == EWOULDBLOCK)
break;
else if (errno == EINTR)
continue;
else
{
lg(Warning, "sockfd: %d, client info [%s:%d] recv error...", sockFd, connectionSharedPtr->_clientIp.c_str(), connectionSharedPtr->_clientPort);
connectionSharedPtr->_except_cb(connectionSharedPtr);
return;
}
}
}
// 数据有了 但是不一定完整 检测 收到完整报文后处理
_MsgHandler(connectionSharedPtr);
}
void Sender(std::weak_ptr<TcpConnection> connectionWeakPtr)
{
if (connectionWeakPtr.expired())
return;
auto connectionSharedPtr = connectionWeakPtr.lock();
auto &outbuffer = connectionSharedPtr->getOutBuffer();
while (true)
{
ssize_t n = send(connectionSharedPtr->getSockFd(), outbuffer.c_str(), outbuffer.size(), 0);
if (n > 0)
{
outbuffer.erase(0, n);
if (outbuffer.empty())
break;
}
else if (n == 0)
return;
else
{
if (errno == EWOULDBLOCK)
break;
else if (errno == EINTR)
continue;
else
{
lg(Warning, "sockfd: %d, client info %s:%d send error...", connectionSharedPtr->getSockFd(), connectionSharedPtr->_clientIp.c_str(), connectionSharedPtr->_clientPort);
connectionSharedPtr->_except_cb(connectionSharedPtr);
return;
}
}
}
if (!outbuffer.empty()) // 开启对写事件的关心
EnableEvent(connectionSharedPtr->getSockFd(), true, true);
else // 关闭对写事件的关心
EnableEvent(connectionSharedPtr->getSockFd(), true, false);
}
void Excepter(std::weak_ptr<TcpConnection> connectionWeakPtr)
{
if (connectionWeakPtr.expired())
return;
auto connectionSharedPtr = connectionWeakPtr.lock();
int fd = connectionSharedPtr->getSockFd();
lg(Warning, "Excepter hander sockfd: %d, client info %s:%d excepter handler",
connectionSharedPtr->getSockFd(), connectionSharedPtr->_clientIp.c_str(), connectionSharedPtr->_clientPort);
// 1. 移除对特定fd的关心
// EnableEvent(connection->SockFd(), false, false);
_epoller_ptr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0);
// 2. 关闭异常的文件描述符
lg(Debug, "close %d done...\n", fd);
close(fd);
// 3. 从unordered_map中移除
lg(Debug, "remove %d from _connections...\n", fd);
_tcpConnections.erase(fd);
}
void EnableEvent(int sockFd, bool readable, bool writeable)
{
uint32_t events = 0;
events |= ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);
_epoller_ptr->EpllerUpdate(EPOLL_CTL_MOD, sockFd, events);
}
bool IsConnectionSafe(int fd)
{
auto iter = _tcpConnections.find(fd);
if (iter == _tcpConnections.end())
return false;
else
return true;
}
void Dispatcher(int timeout)
{
int n = _epoller_ptr->EpollerWait(revs, num);
for (int i = 0; i < n; i++)
{
uint32_t events = revs[i].events;
int sockFd = revs[i].data.fd;
// 统一把事件异常转换成为读写问题
// if (events & EPOLLERR)
// events |= (EPOLLIN | EPOLLOUT);
// if (events & EPOLLHUP)
// events |= (EPOLLIN | EPOLLOUT);
// 只需要处理EPOLLIN EPOLLOUT
if ((events & EPOLLIN) && IsConnectionSafe(sockFd))
{
if (_tcpConnections[sockFd]->_recv_cb)
_tcpConnections[sockFd]->_recv_cb(_tcpConnections[sockFd]);
}
if ((events & EPOLLOUT) && IsConnectionSafe(sockFd))
{
if (_tcpConnections[sockFd]->_send_cb)
_tcpConnections[sockFd]->_send_cb(_tcpConnections[sockFd]);
}
}
}
~TcpServer()
{
}
};
查看本服务器代码行数
find . -type f -name "*.hpp" -o -name "*.cc" | xargs wc -l
177 ./Log.hpp
82 ./ClientCal.cc
75 ./testBind.cc
377 ./TcpServer.hpp
62 ./Epoller.hpp
82 ./Calculator.hpp
44 ./Main.cc
8 ./nocopy.hpp
14 ./Common.hpp
194 ./Protocol.hpp
118 ./Socket.hpp
1233 total