1.为什么要用epoll?
前面使用socket相关接口编写了一个echo服务器,但是存在以下问题:
- (1) 该服务器只能处理一个客户端连接;
- (2) 在绝大部分时间,这个连接都是空闲的,造成了极大地资源浪费。
这时,就需要epoll上场了,epoll是linux内核为处理大批量文件描述符而改进的poll,是linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率;
2.epoll的工作方式
- [1] LT(Level Triggered)是默认的工作方式,同时支持block和no block的socket,这种工作方式下,当socket上有未读完/写完数据,就会一直产生EPOLLIN/EPOLLOUT事件;
- [2] ET(Edge Triggered)边沿触发模式,仅支持no block的socket,这种工作方式下,socket上每次有新来的数据时,就会触发一次,若触发后,未将socket上数据读完,不会再次触发,除非再新来一次数据;
也就是说,对于一个非阻塞socket,若使用epoll边沿触发模式去检测数据是否可读,触发可读事件后,一定要一次性将socket上数据收取干净才行(即,循环调用recv函数直到recv出错,错误码为EWOULDBLOCK/EAGAIN,表示socket上本次数据已读完);若是水平触发模式,则无需如此,你可以根据业务一次性收取固定字节数/收完为止。
3.相关代码
- 网络地址封装
InetAddress
头文件
#pragma once
#include <netinet/in.h>
#include <string>
class InetAddress {
public:
explicit InetAddress(uint16_t port = 0, const std::string &ip = "127.0.0.1");
explicit InetAddress(sockaddr_in addr);
std::string toIp() const;
std::string toIpPort() const;
uint16_t toPort() const;
void setAddress(const sockaddr_in &addr);
const sockaddr_in *getAddress() const;
private:
sockaddr_in addr_;
};
- 网络地址封装
InetAddress
实现文件
/*
* @Author: chenjingyu
* @Date: 2024-09-06 14:10:10
* @Contact: 2458006466@qq.com
* @Description: InetAddress
*/
#include "Core/InetAddress.h"
#include <arpa/inet.h>
#include <string.h>
InetAddress::InetAddress(uint16_t port, const std::string &ip) {
memset(&addr_, 0, sizeof(addr_));
addr_.sin_family = AF_INET;
addr_.sin_port = htons(port);
addr_.sin_addr.s_addr = inet_addr(ip.c_str());
}
InetAddress::InetAddress(sockaddr_in addr) : addr_(std::move(addr)) {
}
std::string InetAddress::toIp() const {
char buf[64] = {0};
inet_ntop(AF_INET, &addr_.sin_addr, buf, sizeof(buf));
return buf;
}
std::string InetAddress::toIpPort() const {
return toIp() + ":" + std::to_string(toPort());
}
uint16_t InetAddress::toPort() const {
return ntohs(addr_.sin_port);
}
void InetAddress::setAddress(const sockaddr_in &addr) {
addr_ = addr;
}
const sockaddr_in *InetAddress::getAddress() const {
return &addr_;
}
Socket
封装头文件
/*
* @Author: chenjingyu
* @Date: 2024-09-06 14:01:27
* @Contact: 2458006466@qq.com
* @Description: Socket
*/
#pragma once
class InetAddress;
class Socket {
public:
Socket();
explicit Socket(int sockfd);
~Socket();
int sockfd() const;
void setSockfd(int sockfd);
void BindAddress(const InetAddress &local_addr);
void Listen();
int Accept(InetAddress *perr_addr);
void ShutdownWrite();
void setTcpNoDelay(bool on);
void setReuseAddr(bool on);
void setReusePort(bool on);
void setKeepAlive(bool on);
void setNonBlocking();
private:
int sockfd_;
};
Socket
封装实现文件
/*
* @Author: chenjingyu
* @Date: 2024-09-06 14:05:49
* @Contact: 2458006466@qq.com
* @Description: Socket
*/
#include "Socket.h"
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <string.h>
#include <fcntl.h>
#include "Logger.h"
#include "InetAddress.h"
Socket::Socket() : sockfd_(-1) {
sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
CHECK(sockfd_ != -1) << "Socket create error.";
}
Socket::Socket(int sockfd) : sockfd_(sockfd) {}
Socket::~Socket() {
close(sockfd_);
}
int Socket::sockfd() const {
return sockfd_;
}
void Socket::setSockfd(int sockfd) {
sockfd_ = sockfd;
}
void Socket::BindAddress(const InetAddress &local_addr) {
int ret = bind(sockfd_, (sockaddr*)local_addr.getAddress(), sizeof(sockaddr_in));
CHECK(ret != -1) << "Bind sockfd: " << sockfd_ << "failed.";
}
void Socket::Listen() {
int ret = listen(sockfd_, SOMAXCONN);
CHECK(ret != -1) << "Failed Listen fd: " << sockfd_;
}
int Socket::Accept(InetAddress *peer_addr) {
sockaddr_in clnt_addr;
socklen_t addr_len;
memset(&clnt_addr, 0, sizeof(clnt_addr));
int clnt_sockfd = accept(sockfd_, (sockaddr*) &clnt_addr, &addr_len);
if (clnt_sockfd != -1) {
peer_addr->setAddress(clnt_addr);
}
return clnt_sockfd;
}
void Socket::ShutdownWrite() {
int ret = shutdown(sockfd_, SHUT_WR);
if (ret < 0) {
LogError("Socket::ShutdownWrite Error.");
}
}
void Socket::setTcpNoDelay(bool on) {
int opt_val = on ? 1 : 0;
setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY, &opt_val, sizeof(opt_val));
}
void Socket::setReusePort(bool on) {
int opt_val = on ? 1 : 0;
setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &opt_val, sizeof(opt_val));
}
void Socket::setReuseAddr(bool on) {
int opt_val = on ? 1 : 0;
setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &opt_val, sizeof(opt_val));
}
void Socket::setKeepAlive(bool on) {
int opt_val = on ? 1 : 0;
setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE, &opt_val, sizeof(opt_val));
}
void Socket::setNonBlocking() {
fcntl(sockfd_, F_SETFL, fcntl(sockfd_, F_GETFL) | O_NONBLOCK);
}
Epoller
封装头文件
/*
* @Author: chenjingyu
* @Date: 2024-09-06 16:24:09
* @Contact: 2458006466@qq.com
* @Description: Epoller
*/
#pragma once
#include <vector>
#include <sys/epoll.h>
class Channel;
class Epoller {
public:
Epoller();
~Epoller();
void AddFd(int fd, uint32_t op);
std::vector<epoll_event> Poll(int timeout = -1);
void UpdateChannel(Channel *channel);
std::vector<Channel*> PollChannels(int timeout = -1);
private:
int epfd_;
struct epoll_event *evts_;
};
Epoller
实现文件
/*
* @Author: chenjingyu
* @Date: 2024-09-06 16:24:13
* @Contact: 2458006466@qq.com
* @Description: Epoller
*/
#include "Epoller.h"
#include "Logger.h"
#include <unistd.h>
#include <string.h>
#define MAX_EVENTS 1000
Epoller::Epoller() : epfd_(-1), evts_(nullptr) {
epfd_ = epoll_create1(0);
CHECK(epfd_ != -1) << "epoll create error.";
evts_ = new epoll_event[MAX_EVENTS];
memset(evts_, 0, MAX_EVENTS * sizeof(epoll_event));
}
Epoller::~Epoller() {
if (epfd_ != -1) {
close(epfd_);
epfd_ = -1;
}
delete[] evts_;
evts_ = nullptr;
}
void Epoller::AddFd(int fd, uint32_t op) {
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.data.fd = fd;
ev.events = op;
int ret = epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev);
CHECK(ret != -1) << "epoll add event error.";
}
std::vector<epoll_event> Epoller::Poll(int timeout) {
std::vector<epoll_event> result;
int nfds = epoll_wait(epfd_, evts_, MAX_EVENTS, timeout);
CHECK(nfds != -1) << "epoll wait error.";
for (int i = 0; i < nfds; ++i) {
result.emplace_back(evts_[i]);
}
return result;
}
void Epoller::UpdateChannel(Channel *channel) {
const int sockfd = channel->sockfd();
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.data.ptr = channel;
ev.events = channel->events();
if (!channel->inEpoller()) {
LogInfo("epoll add {}.", sockfd);
int ret = epoll_ctl(epfd_, EPOLL_CTL_ADD, sockfd, &ev);
CHECK(ret != -1) << "epoll add error.";
} else {
LogInfo("epoll modify {}.", sockfd);
int ret = epoll_ctl(epfd_, EPOLL_CTL_MOD, sockfd, &ev);
CHECK(ret != -1) << "epoll modify error.";
}
}
std::vector<Channel*> Epoller::PollChannels(int timeout) {
std::vector<Channel*> result;
int nfds = epoll_wait(epfd_, evts_, MAX_EVENTS, timeout);
for (int i = 0; i < nfds; ++i) {
Channel *channel = (Channel*) evts_[i].data.ptr;
channel->setRevents(evts_[i].events);
result.emplace_back(channel);
}
return result;
}
- 测试代码:
/*
* @Author: chenjingyu
* @Date: 2024-09-06 14:46:11
* @Contact: 2458006466@qq.com
* @Description: TestEpoller
*/
#include "Loggerr.h"
#include "Epoller.h"
#include "InetAddress.h"
#include "Socket.h"
#include <string.h>
#include <unistd.h>
#define READ_BUFFER 1024
void HandleReadEvent(int);
int main(int argc, char *argv[]) {
Socket socket;
InetAddress addr(8080, "0.0.0.0");
socket.BindAddress(addr);
socket.Listen();
socket.setNonBlocking();
Epoller epoller;
epoller.AddFd(socket.sockfd(), EPOLLIN | EPOLLET);
InetAddress clnt_addr;
Socket clnt_socket;
while (true) {
std::vector<epoll_event> evts = epoller.Poll();
int nfds = evts.size();
for (int i = 0; i < nfds; ++i) {
if (evts[i].data.fd == socket.sockfd()) {
// 新客户端连接
clnt_socket.setSockfd(socket.Accept(&clnt_addr));
LogInfo("new client fd: {}, IP: {}, Port: {}.", clnt_socket.sockfd(),
clnt_addr.toIp(), clnt_addr.toPort());
clnt_socket.setNonBlocking();
epoller.AddFd(clnt_socket.sockfd(), EPOLLIN | EPOLLET);
} else if (evts[i].events & EPOLLIN) {
// 可读事件
HandleReadEvent(evts[i].data.fd);
} else {
LogInfo("something else happened.");
}
}
}
return 0;
}
void HandleReadEvent(int sockfd) {
char buf[READ_BUFFER];
// 由于使用非阻塞IO,读取客户端buffer,一次读取buf大小数据,直到全部读取完毕
while (true) {
bzero(&buf, sizeof(buf));
ssize_t bytes_read = read(sockfd, buf, sizeof(buf));
if (bytes_read > 0) {
LogInfo("message from client fd {}: {}", sockfd, buf);
write(sockfd, buf, sizeof(buf));
} else if (bytes_read == -1 && errno == EINTR) {
// 客户端正常中断、继续读取
LogInfo("continue reading");
continue;
} else if (bytes_read == -1 &&
((errno == EAGAIN) || (errno == EWOULDBLOCK))) {
// 非阻塞IO,这个条件表示数据全部读取完毕
LogInfo("finish reading once, errno: {}", errno);
break;
} else if (bytes_read == 0) {
// EOF,客户端断开连接
LogInfo("EOF, client fd {} disconnected", sockfd);
close(sockfd); //关闭socket会自动将文件描述符从epoll树上移除
break;
}
}
}
- 编译
Makefile
文件
server:
g++ Server.cc Logger.cc InetAddress.cc Epoller.cc Socket.cc -o Server
clean:
rm Server
- 测试
# 服务端启动
>> ./Server
# 测试连接
>> nc 127.0.0.1 8080
参考资料: