IO多路复用
IO多路复用
1. IO多路复用概述
IO多路复用是一种同时监控多个文件描述符的机制,允许单个进程处理多个IO操作。主要解决高并发场景下的IO效率问题。
1.1 核心概念
- 文件描述符(File Descriptor): Linux中一切皆文件,socket、设备等都通过fd访问
- 就绪通知(Ready Notification): 当fd可读、可写或出现异常时通知应用程序
- 事件驱动(Event-Driven): 基于事件触发而非轮询
1.2 主要技术
- select: 最古老的多路复用机制
- poll: select的改进版本
- epoll: Linux特有的高性能多路复用
2. select系统调用
2.1 select基本原理
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);
// 相关操作宏
void FD_ZERO(fd_set *set);           // 清空集合
void FD_SET(int fd, fd_set *set);    // 添加fd到集合
void FD_CLR(int fd, fd_set *set);    // 从集合移除fd
int FD_ISSET(int fd, fd_set *set);   // 检查fd是否在集合中
2.2 select使用示例
#include <stdio.h>
#include <stdlib.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#define MAX_FD 1024
int main() {
    fd_set readfds;
    struct timeval timeout;
    int ret;
    
    while (1) {
        // 清空文件描述符集合
        FD_ZERO(&readfds);
        
        // 添加标准输入到监控集合
        FD_SET(STDIN_FILENO, &readfds);
        
        // 设置超时时间为5秒
        timeout.tv_sec = 5;
        timeout.tv_usec = 0;
        
        // 调用select
        ret = select(STDIN_FILENO + 1, &readfds, NULL, NULL, &timeout);
        
        if (ret == -1) {
            perror("select");
            break;
        } else if (ret == 0) {
            printf("Timeout occurred! No data after 5 seconds.\n");
        } else {
            // 检查标准输入是否可读
            if (FD_ISSET(STDIN_FILENO, &readfds)) {
                char buf[1024];
                ssize_t n = read(STDIN_FILENO, buf, sizeof(buf) - 1);
                if (n > 0) {
                    buf[n] = '\0';
                    printf("Read from stdin: %s", buf);
                }
            }
        }
    }
    
    return 0;
}
2.3 select的局限性
- 文件描述符数量限制(FD_SETSIZE,通常1024)
- 每次调用需要重新设置fd_set
- 线性扫描所有fd,效率随fd数量增加而下降
3. poll系统调用
3.1 poll基本原理
#include <poll.h>
struct pollfd {
    int fd;           // 文件描述符
    short events;     // 请求的事件
    short revents;    // 返回的事件
};
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
// 事件标志
#define POLLIN      0x001   // 有数据可读
#define POLLPRI     0x002   // 有紧急数据可读
#define POLLOUT     0x004   // 可写,不会阻塞
#define POLLERR     0x008   // 发生错误
#define POLLHUP     0x010   // 挂起
#define POLLNVAL    0x020   // 无效的fd
3.2 poll使用示例
#include <stdio.h>
#include <stdlib.h>
#include <poll.h>
#include <unistd.h>
#include <string.h>
#define MAX_FDS 10
int main() {
    struct pollfd fds[2];
    int timeout = 5000; // 5秒超时
    int ret;
    
    // 监控标准输入
    fds[0].fd = STDIN_FILENO;
    fds[0].events = POLLIN;
    
    // 监控标准输出(通常总是可写)
    fds[1].fd = STDOUT_FILENO;
    fds[1].events = POLLOUT;
    
    while (1) {
        ret = poll(fds, 2, timeout);
        
        if (ret == -1) {
            perror("poll");
            break;
        } else if (ret == 0) {
            printf("Poll timeout after 5 seconds.\n");
        } else {
            // 检查每个文件描述符
            for (int i = 0; i < 2; i++) {
                if (fds[i].revents & POLLIN) {
                    printf("fd %d is ready for reading\n", fds[i].fd);
                    char buf[1024];
                    ssize_t n = read(fds[i].fd, buf, sizeof(buf) - 1);
                    if (n > 0) {
                        buf[n] = '\0';
                        printf("Read: %s", buf);
                    }
                }
                
                if (fds[i].revents & POLLOUT) {
                    printf("fd %d is ready for writing\n", fds[i].fd);
                }
                
                if (fds[i].revents & POLLERR) {
                    printf("Error on fd %d\n", fds[i].fd);
                }
            }
        }
    }
    
    return 0;
}
4. epoll系统调用
4.1 epoll基本原理
epoll是Linux特有的高性能IO多路复用机制,解决了select/poll的性能问题。
#include <sys/epoll.h>
// 创建epoll实例
int epoll_create(int size);
int epoll_create1(int flags);
// 控制epoll事件
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 等待事件
int epoll_wait(int epfd, struct epoll_event *events,
               int maxevents, int timeout);
struct epoll_event {
    uint32_t events;    // epoll事件
    epoll_data_t data;  // 用户数据
};
typedef union epoll_data {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
} epoll_data_t;
// 事件标志
#define EPOLLIN     0x001   // 可读
#define EPOLLOUT    0x004   // 可写
#define EPOLLET     0x800   // 边缘触发
#define EPOLLONESHOT 0x4000000 // 一次性事件
4.2 epoll使用示例
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#define MAX_EVENTS 10
int main() {
    int epoll_fd;
    struct epoll_event event, events[MAX_EVENTS];
    int ret;
    
    // 创建epoll实例
    epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1");
        exit(EXIT_FAILURE);
    }
    
    // 添加标准输入到epoll监控
    event.events = EPOLLIN;
    event.data.fd = STDIN_FILENO;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, STDIN_FILENO, &event) == -1) {
        perror("epoll_ctl");
        close(epoll_fd);
        exit(EXIT_FAILURE);
    }
    
    printf("Epoll started, monitoring stdin...\n");
    
    while (1) {
        // 等待事件,超时5秒
        ret = epoll_wait(epoll_fd, events, MAX_EVENTS, 5000);
        
        if (ret == -1) {
            perror("epoll_wait");
            break;
        } else if (ret == 0) {
            printf("No events within 5 seconds.\n");
            continue;
        }
        
        // 处理所有就绪的事件
        for (int i = 0; i < ret; i++) {
            if (events[i].events & EPOLLIN) {
                printf("File descriptor %d is ready for reading\n", 
                       events[i].data.fd);
                
                char buf[1024];
                ssize_t n = read(events[i].data.fd, buf, sizeof(buf) - 1);
                if (n > 0) {
                    buf[n] = '\0';
                    printf("Read %zd bytes: %s", n, buf);
                    
                    // 如果是"quit"则退出
                    if (strncmp(buf, "quit", 4) == 0) {
                        printf("Exiting...\n");
                        close(epoll_fd);
                        return 0;
                    }
                } else if (n == 0) {
                    printf("File descriptor %d closed\n", events[i].data.fd);
                    // 从epoll中移除
                    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
                }
            }
            
            if (events[i].events & EPOLLERR) {
                printf("Error on file descriptor %d\n", events[i].data.fd);
            }
        }
    }
    
    close(epoll_fd);
    return 0;
}
4.3 水平触发 vs 边缘触发
// 水平触发(默认)
event.events = EPOLLIN;  // 水平触发
// 边缘触发
event.events = EPOLLIN | EPOLLET;  // 边缘触发
水平触发(Level-Triggered):
- 只要文件描述符可读/可写,就会持续通知
- 类似于select/poll的行为
- 编程更简单,不容易丢失事件
边缘触发(Edge-Triggered):
- 只有当文件描述符状态发生变化时通知
- 需要一次性读取所有可用数据
- 性能更好,但编程更复杂
4.4 边缘触发示例
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#define MAX_EVENTS 10
#define BUFFER_SIZE 1024
// 设置文件描述符为非阻塞
int set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1) return -1;
    return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
int main() {
    int epoll_fd;
    struct epoll_event event, events[MAX_EVENTS];
    int ret;
    
    epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1");
        exit(EXIT_FAILURE);
    }
    
    // 设置标准输入为非阻塞
    if (set_nonblocking(STDIN_FILENO) == -1) {
        perror("set_nonblocking");
        close(epoll_fd);
        exit(EXIT_FAILURE);
    }
    
    // 使用边缘触发
    event.events = EPOLLIN | EPOLLET;
    event.data.fd = STDIN_FILENO;
    
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, STDIN_FILENO, &event) == -1) {
        perror("epoll_ctl");
        close(epoll_fd);
        exit(EXIT_FAILURE);
    }
    
    printf("Epoll ET mode started...\n");
    
    while (1) {
        ret = epoll_wait(epoll_fd, events, MAX_EVENTS, 5000);
        
        if (ret == -1) {
            perror("epoll_wait");
            break;
        } else if (ret == 0) {
            printf("Timeout...\n");
            continue;
        }
        
        for (int i = 0; i < ret; i++) {
            if (events[i].events & EPOLLIN) {
                printf("EPOLLIN event on fd %d\n", events[i].data.fd);
                
                // 边缘触发需要读取所有可用数据
                while (1) {
                    char buf[BUFFER_SIZE];
                    ssize_t n = read(events[i].data.fd, buf, sizeof(buf) - 1);
                    
                    if (n > 0) {
                        buf[n] = '\0';
                        printf("Read %zd bytes: %s", n, buf);
                    } else if (n == -1) {
                        if (errno == EAGAIN || errno == EWOULDBLOCK) {
                            // 没有更多数据可读
                            printf("No more data to read\n");
                            break;
                        } else {
                            perror("read");
                            break;
                        }
                    } else { // n == 0
                        printf("File descriptor closed\n");
                        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
                        break;
                    }
                }
            }
        }
    }
    
    close(epoll_fd);
    return 0;
}
5. 三种机制对比
| 特性 | select | poll | epoll | 
|---|---|---|---|
| 性能 | O(n) | O(n) | O(1) | 
| 最大fd数 | FD_SETSIZE(1024) | 无限制 | 无限制 | 
| 工作效率 | 随fd增加下降 | 随fd增加下降 | 高效 | 
| 内存使用 | 固定大小 | 动态分配 | 动态分配 | 
| 触发方式 | 水平触发 | 水平触发 | 水平/边缘触发 | 
| 内核通知 | 每次重建fd_set | 每次重建pollfd数组 | 事件驱动 | 
6. 实际应用:简单的TCP服务器
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <errno.h>
#define MAX_EVENTS 64
#define PORT 8080
#define BUFFER_SIZE 1024
int main() {
    int server_fd, epoll_fd;
    struct sockaddr_in address;
    struct epoll_event event, events[MAX_EVENTS];
    int opt = 1;
    int addrlen = sizeof(address);
    
    // 创建服务器socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }
    
    // 设置socket选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, 
                   &opt, sizeof(opt))) {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }
    
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);
    
    // 绑定socket
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }
    
    // 开始监听
    if (listen(server_fd, 10) < 0) {
        perror("listen");
        exit(EXIT_FAILURE);
    }
    
    printf("Server listening on port %d\n", PORT);
    
    // 创建epoll实例
    epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1");
        exit(EXIT_FAILURE);
    }
    
    // 添加服务器socket到epoll
    event.events = EPOLLIN;
    event.data.fd = server_fd;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) == -1) {
        perror("epoll_ctl: server_fd");
        exit(EXIT_FAILURE);
    }
    
    while (1) {
        int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (nfds == -1) {
            perror("epoll_wait");
            break;
        }
        
        for (int i = 0; i < nfds; i++) {
            if (events[i].data.fd == server_fd) {
                // 新的客户端连接
                int client_fd = accept(server_fd, 
                                     (struct sockaddr *)&address, 
                                     (socklen_t*)&addrlen);
                if (client_fd < 0) {
                    perror("accept");
                    continue;
                }
                
                printf("New client connected: fd=%d\n", client_fd);
                
                // 添加客户端socket到epoll
                event.events = EPOLLIN | EPOLLET; // 边缘触发
                event.data.fd = client_fd;
                if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &event) == -1) {
                    perror("epoll_ctl: client_fd");
                    close(client_fd);
                }
            } else {
                // 客户端数据可读
                int client_fd = events[i].data.fd;
                char buffer[BUFFER_SIZE];
                
                while (1) {
                    ssize_t count = read(client_fd, buffer, sizeof(buffer) - 1);
                    
                    if (count > 0) {
                        buffer[count] = '\0';
                        printf("Received from client %d: %s", client_fd, buffer);
                        
                        // 回显数据
                        write(client_fd, buffer, count);
                    } else if (count == 0) {
                        // 客户端断开连接
                        printf("Client %d disconnected\n", client_fd);
                        close(client_fd);
                        break;
                    } else {
                        if (errno == EAGAIN || errno == EWOULDBLOCK) {
                            // 没有更多数据
                            break;
                        } else {
                            perror("read");
                            close(client_fd);
                            break;
                        }
                    }
                }
            }
        }
    }
    
    close(server_fd);
    close(epoll_fd);
    return 0;
}
7. 最佳实践
- 选择合适的机制: - 少量连接:select/poll
- 大量连接:epoll
 
- 边缘触发注意事项: - 必须使用非阻塞IO
- 必须一次性读取所有数据
- 性能更好但编程更复杂
 
- 错误处理: - 总是检查系统调用返回值
- 处理EAGAIN/EWOULDBLOCK错误
- 及时关闭文件描述符
 
- 性能优化: - 使用边缘触发减少系统调用
- 合理设置缓冲区大小
- 避免不必要的内存拷贝
 
IO多路复用是现代高性能网络编程的核心技术,正确使用可以显著提高程序的并发处理能力。
            本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 恒星不见
        
     评论
            
                匿名评论
                隐私政策
            
            
                你无需删除空行,直接评论以获取最佳展示效果
            
         
            
        
