Epoll编程学习笔记

MirrorYuChen
MirrorYuChen
发布于 2024-11-12 / 13 阅读
0
0

Epoll编程学习笔记

1.为什么要用epoll?

前面使用socket相关接口编写了一个echo服务器,但是存在以下问题:

  • (1) 该服务器只能处理一个客户端连接;
  • (2) 在绝大部分时间,这个连接都是空闲的,造成了极大地资源浪费。
    这时,就需要epoll上场了,epoll是linux内核为处理大批量文件描述符而改进的poll,是linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率;

2.epoll的工作方式

  • [1] LT(Level Triggered)是默认的工作方式,同时支持block和no block的socket,这种工作方式下,当socket上有未读完/写完数据,就会一直产生EPOLLIN/EPOLLOUT事件;
  • [2] ET(Edge Triggered)边沿触发模式,仅支持no block的socket,这种工作方式下,socket上每次有新来的数据时,就会触发一次,若触发后,未将socket上数据读完,不会再次触发,除非再新来一次数据;
    也就是说,对于一个非阻塞socket,若使用epoll边沿触发模式去检测数据是否可读,触发可读事件后,一定要一次性将socket上数据收取干净才行(即,循环调用recv函数直到recv出错,错误码为EWOULDBLOCK/EAGAIN,表示socket上本次数据已读完);若是水平触发模式,则无需如此,你可以根据业务一次性收取固定字节数/收完为止。

3.相关代码

  • 网络地址封装 InetAddress头文件
#pragma once

#include <netinet/in.h>
#include <string>

class InetAddress {
public:
  explicit InetAddress(uint16_t port = 0, const std::string &ip = "127.0.0.1");
  explicit InetAddress(sockaddr_in addr);

  std::string toIp() const;
  std::string toIpPort() const;
  uint16_t toPort() const;

  void setAddress(const sockaddr_in &addr);
  const sockaddr_in *getAddress() const;

private:
  sockaddr_in addr_;
};
  • 网络地址封装 InetAddress实现文件
/*
 * @Author: chenjingyu
 * @Date: 2024-09-06 14:10:10
 * @Contact: 2458006466@qq.com
 * @Description: InetAddress
 */
#include "Core/InetAddress.h"
#include <arpa/inet.h>
#include <string.h>

InetAddress::InetAddress(uint16_t port, const std::string &ip) {
  memset(&addr_, 0, sizeof(addr_));
  addr_.sin_family = AF_INET;
  addr_.sin_port = htons(port);
  addr_.sin_addr.s_addr = inet_addr(ip.c_str());
}

InetAddress::InetAddress(sockaddr_in addr) : addr_(std::move(addr)) {

}

std::string InetAddress::toIp() const {
  char buf[64] = {0};
  inet_ntop(AF_INET, &addr_.sin_addr, buf, sizeof(buf));
  return buf;
}

std::string InetAddress::toIpPort() const {
  return toIp() + ":" + std::to_string(toPort());
}

uint16_t InetAddress::toPort() const {
  return ntohs(addr_.sin_port);
}

void InetAddress::setAddress(const sockaddr_in &addr) {
  addr_ = addr;
}

const sockaddr_in *InetAddress::getAddress() const {
  return &addr_;
}
  • Socket封装头文件
/*
 * @Author: chenjingyu
 * @Date: 2024-09-06 14:01:27
 * @Contact: 2458006466@qq.com
 * @Description: Socket
 */
#pragma once

class InetAddress;
class Socket {
public:
  Socket();
  explicit Socket(int sockfd);
  ~Socket();

  int sockfd() const;
  void setSockfd(int sockfd);
  void BindAddress(const InetAddress &local_addr);
  void Listen();
  int Accept(InetAddress *perr_addr);
  
  void ShutdownWrite();

  void setTcpNoDelay(bool on);
  void setReuseAddr(bool on);
  void setReusePort(bool on);
  void setKeepAlive(bool on);

  void setNonBlocking();

private:
  int sockfd_;
};
  • Socket封装实现文件
/*
 * @Author: chenjingyu
 * @Date: 2024-09-06 14:05:49
 * @Contact: 2458006466@qq.com
 * @Description: Socket
 */
#include "Socket.h"
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <string.h>
#include <fcntl.h>

#include "Logger.h"
#include "InetAddress.h"

Socket::Socket() : sockfd_(-1) {
  sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
  CHECK(sockfd_ != -1) << "Socket create error.";
}

Socket::Socket(int sockfd) : sockfd_(sockfd) {}
Socket::~Socket() {
  close(sockfd_);
}

int Socket::sockfd() const {
  return sockfd_;
}

void Socket::setSockfd(int sockfd) {
  sockfd_ = sockfd;
}

void Socket::BindAddress(const InetAddress &local_addr) {
  int ret = bind(sockfd_, (sockaddr*)local_addr.getAddress(), sizeof(sockaddr_in));
  CHECK(ret != -1) << "Bind sockfd: " << sockfd_ << "failed.";
}

void Socket::Listen() {
  int ret = listen(sockfd_, SOMAXCONN);
  CHECK(ret != -1) << "Failed Listen fd: " << sockfd_;
}

int Socket::Accept(InetAddress *peer_addr) {
  sockaddr_in clnt_addr;
  socklen_t addr_len;
  memset(&clnt_addr, 0, sizeof(clnt_addr));
  int clnt_sockfd = accept(sockfd_, (sockaddr*) &clnt_addr, &addr_len);
  if (clnt_sockfd != -1) {
    peer_addr->setAddress(clnt_addr);
  }
  return clnt_sockfd;
}

void Socket::ShutdownWrite() {
  int ret = shutdown(sockfd_, SHUT_WR);
  if (ret < 0) {
    LogError("Socket::ShutdownWrite Error.");
  }
}

void Socket::setTcpNoDelay(bool on) {
  int opt_val = on ? 1 : 0;
  setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY, &opt_val, sizeof(opt_val));
}

void Socket::setReusePort(bool on) {
  int opt_val = on ? 1 : 0;
   setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &opt_val, sizeof(opt_val));
}

void Socket::setReuseAddr(bool on) {
  int opt_val = on ? 1 : 0;
  setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &opt_val, sizeof(opt_val));
}

void Socket::setKeepAlive(bool on) {
  int opt_val = on ? 1 : 0;
  setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE, &opt_val, sizeof(opt_val));
}

void Socket::setNonBlocking() {
  fcntl(sockfd_, F_SETFL, fcntl(sockfd_, F_GETFL) | O_NONBLOCK);
}
  • Epoller封装头文件
/*
 * @Author: chenjingyu
 * @Date: 2024-09-06 16:24:09
 * @Contact: 2458006466@qq.com
 * @Description: Epoller
 */
#pragma once
#include <vector>
#include <sys/epoll.h>

class Channel;
class Epoller {
public:
  Epoller();
  ~Epoller();

  void AddFd(int fd, uint32_t op);
  std::vector<epoll_event> Poll(int timeout = -1);

  void UpdateChannel(Channel *channel);
  std::vector<Channel*> PollChannels(int timeout = -1);

private:
  int epfd_;
  struct epoll_event *evts_;
};
  • Epoller实现文件
/*
 * @Author: chenjingyu
 * @Date: 2024-09-06 16:24:13
 * @Contact: 2458006466@qq.com
 * @Description: Epoller
 */
#include "Epoller.h"
#include "Logger.h"
#include <unistd.h>
#include <string.h>

#define MAX_EVENTS 1000
Epoller::Epoller() : epfd_(-1), evts_(nullptr) {
  epfd_ = epoll_create1(0);
  CHECK(epfd_ != -1) << "epoll create error.";
  evts_ = new epoll_event[MAX_EVENTS];
  memset(evts_, 0, MAX_EVENTS * sizeof(epoll_event));
}

Epoller::~Epoller() {
  if (epfd_ != -1) {
    close(epfd_);
    epfd_ = -1;
  }
  delete[] evts_;
  evts_ = nullptr;
}

void Epoller::AddFd(int fd, uint32_t op) {
  struct epoll_event ev;
  memset(&ev, 0, sizeof(ev));
  ev.data.fd = fd;
  ev.events = op;
  int ret = epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev);
  CHECK(ret != -1) << "epoll add event error.";
}

std::vector<epoll_event> Epoller::Poll(int timeout) {
  std::vector<epoll_event> result;
  int nfds = epoll_wait(epfd_, evts_, MAX_EVENTS, timeout);
  CHECK(nfds != -1) << "epoll wait error.";
  for (int i = 0; i < nfds; ++i) {
    result.emplace_back(evts_[i]);
  }
  return result;
}

void Epoller::UpdateChannel(Channel *channel) {
  const int sockfd = channel->sockfd();
  struct epoll_event ev;
  memset(&ev, 0, sizeof(ev));
  ev.data.ptr = channel;
  ev.events = channel->events();
  if (!channel->inEpoller()) {
    LogInfo("epoll add {}.", sockfd);
    int ret = epoll_ctl(epfd_, EPOLL_CTL_ADD, sockfd, &ev);
    CHECK(ret != -1) << "epoll add error.";
  } else {
    LogInfo("epoll modify {}.", sockfd);
    int ret = epoll_ctl(epfd_, EPOLL_CTL_MOD, sockfd, &ev);
    CHECK(ret != -1) << "epoll modify error.";
  }
}

std::vector<Channel*> Epoller::PollChannels(int timeout) {
  std::vector<Channel*> result;
  int nfds = epoll_wait(epfd_, evts_, MAX_EVENTS, timeout);
  for (int i = 0; i < nfds; ++i) {
    Channel *channel = (Channel*) evts_[i].data.ptr;
    channel->setRevents(evts_[i].events);
    result.emplace_back(channel);
  }
  return result;
}
  • 测试代码:
/*
 * @Author: chenjingyu
 * @Date: 2024-09-06 14:46:11
 * @Contact: 2458006466@qq.com
 * @Description: TestEpoller
 */
#include "Loggerr.h"
#include "Epoller.h"
#include "InetAddress.h"
#include "Socket.h"
#include <string.h>
#include <unistd.h>

#define READ_BUFFER 1024
void HandleReadEvent(int);

int main(int argc, char *argv[]) {
  Socket socket;
  InetAddress addr(8080, "0.0.0.0");
  socket.BindAddress(addr);
  socket.Listen();
  socket.setNonBlocking();

  Epoller epoller;
  epoller.AddFd(socket.sockfd(), EPOLLIN | EPOLLET);

  InetAddress clnt_addr;
  Socket clnt_socket;
  while (true) {
    std::vector<epoll_event> evts = epoller.Poll();
    int nfds = evts.size();
    for (int i = 0; i < nfds; ++i) {
      if (evts[i].data.fd == socket.sockfd()) {
        // 新客户端连接
        clnt_socket.setSockfd(socket.Accept(&clnt_addr));
        LogInfo("new client fd: {}, IP: {}, Port: {}.", clnt_socket.sockfd(),
                clnt_addr.toIp(), clnt_addr.toPort());
        clnt_socket.setNonBlocking();
        epoller.AddFd(clnt_socket.sockfd(), EPOLLIN | EPOLLET);
      } else if (evts[i].events & EPOLLIN) {
        // 可读事件
        HandleReadEvent(evts[i].data.fd);
      } else {
        LogInfo("something else happened.");
      }
    }
  }
  return 0;
}

void HandleReadEvent(int sockfd) {
  char buf[READ_BUFFER];
  // 由于使用非阻塞IO,读取客户端buffer,一次读取buf大小数据,直到全部读取完毕
  while (true) {
    bzero(&buf, sizeof(buf));
    ssize_t bytes_read = read(sockfd, buf, sizeof(buf));
    if (bytes_read > 0) {
      LogInfo("message from client fd {}: {}", sockfd, buf);
      write(sockfd, buf, sizeof(buf));
    } else if (bytes_read == -1 && errno == EINTR) {
      // 客户端正常中断、继续读取
      LogInfo("continue reading");
      continue;
    } else if (bytes_read == -1 &&
               ((errno == EAGAIN) || (errno == EWOULDBLOCK))) {
      // 非阻塞IO,这个条件表示数据全部读取完毕
      LogInfo("finish reading once, errno: {}", errno);
      break;
    } else if (bytes_read == 0) {
      // EOF,客户端断开连接
      LogInfo("EOF, client fd {} disconnected", sockfd);
      close(sockfd); //关闭socket会自动将文件描述符从epoll树上移除
      break;
    }
  }
}
  • 编译 Makefile文件
server:
	g++ Server.cc Logger.cc InetAddress.cc Epoller.cc Socket.cc -o Server 

clean:
	rm Server 
  • 测试
# 服务端启动
>> ./Server
# 测试连接
>> nc 127.0.0.1 8080

参考资料:


评论