设为首页 加入收藏

TOP

Linux网络编程--epoll 模型原理详解以及实例(二)
2015-11-10 13:45:30 来源: 作者: 【 】 浏览:8
Tags:Linux 网络编程 --epoll 模型 原理 详解 以及 实例
uct epoll_event ev, *events;
for(;;)
{
? nfds = epoll_wait(kdpfd, events, maxevents, -1);? ? //等待IO事件
? for(n = 0; n < nfds; ++n)
? {
? //如果是主socket的事件,则表示有新连接进入,需要进行新连接的处理。
? ? ? if(events[n].data.fd == listener)
? ? ? {
? ? ? ? client = accept(listener, (struct sockaddr *) &local,? &addrlen);
if(client < 0)
? ? ? ? {
? ? ? ? ? ? perror("accept error");
? ? ? ? ? ? continue;
? ? ? ? }
? ? ? ? // 将新连接置于非阻塞模式
? ? ? ? setnonblocking(client);
? ? ? ? ev.events = EPOLLIN | EPOLLET;
? ? ? ? //注意这里的参数EPOLLIN | EPOLLET并没有设置对写socket的监听,
? ? ? ? //如果有写操作的话,这个时候epoll是不会返回事件的,
? ? ? ? //如果要对写操作也监听的话,应该是EPOLLIN | EPOLLOUT | EPOLLET。
? ? ? ? // 并且将新连接也加入EPOLL的监听队列
? ? ? ? ev.data.fd = client;
? ? ? ? // 设置好event之后,将这个新的event通过epoll_ctl
? ? ? ? if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0)
? ? ? ? {
? ? ? ? ? ? //加入到epoll的监听队列里,这里用EPOLL_CTL_ADD
? ? ? ? ? ? //来加一个新的 epoll事件。可以通过EPOLL_CTL_DEL来减少
? ? ? ? ? ? //一个epoll事件,通过EPOLL_CTL_MOD来改变一个事件的监听方式。
? ? ? ? ? ? fprintf(stderr, "epoll set insertion error: fd=%d"0, client);
? ? ? ? ? ? return -1;
? ? ? ? }
? ? ? }
? ? ? else
? ? ? // 如果不是主socket的事件的话,则代表这是一个用户的socket的事件,
? ? ? // 则用来处理这个用户的socket的事情是,比如说read(fd,xxx)之类,或者一些其他的处理。
? ? ? ? do_use_fd(events[n].data.fd);
? }
}8 EPOLL模型的简单实例
#include
#include
#include
#include
#include
#include
#include
#include


#define MAXLINE 10
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5555
#define INFTIM 1000


void setnonblocking(int sock)
{
? int opts;
? opts = fcntl(sock, F_GETFL);
? if(opts < 0)
? {
? ? ? perror("fcntl(sock, GETFL)");
? ? ? exit(1);
? }
? opts = opts | O_NONBLOCK;
? if(fcntl(sock, F_SETFL, opts) < 0)
? {
? ? ? perror("fcntl(sock,SETFL,opts)");
? ? ? exit(1);
? }
}


int main()
{
? int i, maxi, listenfd, connfd, sockfd, epfd, nfds;
? ssize_t n;
? char line[MAXLINE];
? socklen_t clilen;
? //声明epoll_event结构体的变量, ev用于注册事件, events数组用于回传要处理的事件
? struct epoll_event ev,events[20];
? //生成用于处理accept的epoll专用的文件描述符, 指定生成描述符的最大范围为256
? epfd = epoll_create(256);
? struct sockaddr_in clientaddr;
? struct sockaddr_in serveraddr;
? listenfd = socket(AF_INET, SOCK_STREAM, 0);


? setnonblocking(listenfd);? ? ? //把用于监听的socket设置为非阻塞方式
? ev.data.fd = listenfd;? ? ? ? ? //设置与要处理的事件相关的文件描述符
? ev.events = EPOLLIN | EPOLLET;? //设置要处理的事件类型
? epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);? ? //注册epoll事件
? bzero(&serveraddr, sizeof(serveraddr));
? serveraddr.sin_family = AF_INET;
? char *local_addr = "200.200.200.204";
? inet_aton(local_addr, &(serveraddr.sin_addr));
? serveraddr.sin_port = htons(SERV_PORT);? //或者htons(SERV_PORT);
? bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));
? listen(listenfd, LISTENQ);


? maxi = 0;
? for( ; ; )
? {
? ? ? nfds = epoll_wait(epfd, events, 20, 500); //等待epoll事件的发生
? ? ? for(i = 0; i < nfds; ++i)? ? ? ? ? ? ? ? //处理所发生的所有事件
? ? ? {
? ? ? ? if(events[i].data.fd == listenfd)? ? ? //监听事件
? ? ? ? {
? ? ? ? ? ? connfd = accept(listenfd, (sockaddr *)&clientaddr, &clilen);
? ? ? ? ? ? if(connfd < 0)
? ? ? ? ? ? {
? ? ? ? ? ? ? perror("connfd<0");
? ? ? ? ? ? ? exit(1);
? ? ? ? ? ? }
? ? ? ? ? ? setnonblocking(connfd);? ? ? ? ? //把客户端的socket设置为非阻塞方式
? ? ? ? ? ? char *str = inet_ntoa(clientaddr.sin_addr);
? ? ? ? ? ? std::cout << "connect from " << str? <? ? ? ? ? ? ev.data.fd=connfd;? ? ? ? ? ? ? ? //设置用于读操作的文件描述符
? ? ? ? ? ? ev.events=EPOLLIN | EPOLLET;? ? ? //设置用于注测的读操作事件
? ? ? ? ? ? epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
? ? ? ? ? ? //注册ev事件
? ? ? ? }
? ? ? ? else if(events[i].events&EPOLLIN)? ? ? //读事件
? ? ? ? {
? ? ? ? ? ? if ( (sockfd = events[i].data.fd) < 0)
? ? ? ? ? ? {
? ? ? ? ? ? ? continue;
? ? ? ? ? ? }
? ? ? ? ? ? if ( (n = read(sockfd, line, MAXLINE)) < 0) // 这里和IOCP不同
? ? ? ? ? ? {
? ? ? ? ? ? ? if (errno == ECONNRESET)
? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? close(sockfd);
? ? ? ? ? ? ? ? ? events[i].data.fd = -1;
? ? ? ? ? ? ? }
? ? ? ? ? ? ? else
? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? std::cout<<"readline error"<? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? else if (n == 0)
? ? ? ? ? ? {
? ? ? ? ? ? ? close(sockfd);
? ? ? ? ? ? ? events[i].data.fd = -1;
? ? ? ? ? ? }
? ? ? ? ? ? ev.data.fd=sockfd;? ? ? ? ? ? ? //设置用于写操作的文件描述符
? ? ? ? ? ? ev.events=EPOLLOUT | EPOLLET;? //设置用于注测的写操作事件
? ? ? ? ? ? //修改sockfd上要处理的事件为EPOLLOUT
? ? ? ? ? ? epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
? ? ? ? }
? ? ? ? else if(events[i].events&EPOLLOUT)//写事件
? ? ? ? {
? ? ? ? ? ? sockfd = events[i].data.fd;
? ? ? ? ? ? write(sockfd, line, n);
? ? ? ? ? ? ev.data.fd = sockfd;? ? ? ? ? ? ? //设置用于读操作的文件描述符
? ? ? ? ? ? ev.events = EPOLLIN | EPOLLET;? ? //设置用于注册的读操作事件
? ? ? ? ? ? //修改sockfd上要处理的事件为EPOLIN
? ? ? ? ? ? epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
? ? ? ? }
? ? ? }
? }
}9.epoll进阶思考
9.1. 问题来源
最近学习EPOLL模型,介绍中说将EPOLL与Windows IOCP模型进行比较,说其的优势在于解决了IOCP模型大量线程上下文切换的开销,于是可以看出,EPOLL模型不需要多线程,即单线程中可以处理EPOLL逻辑。如果引入多线程反而会引起一些问题。但是EPOLL模型的服务器端到底可以不可以用多线程技术,如果可以,改怎么取舍,这成了困扰我的问题。上网查了一下,有这样几种声音:
(1) “要么事件驱动(如epoll),要么多线程,要么多进程,把这几个综合起来使用,感觉更加麻烦。”;
(2) “单线程使用epoll,但是不能发挥多核;多线程不用epoll。”;
(3) “主通信线程使用epoll所有需要监控的FD,有事件交给多线程去处理”;
(4) “既然用了epoll, 那么线程就不应该看到fd, 而只看到的是一个一个的业务请求/响应; epoll将网络数据组装成业务数据后, 转交给业务线程进行处理。这就是常说的半同步半异步”。
我比较赞同上述(3)、(4)中的观点
EPOLLOUT只有在缓冲区已经满了,不可以发送了,过了一会儿缓冲区中有空间了,就会触发EPOLLOUT,而且只触发一次。如果你编写的程序的网络IO不大,一次写入的数据不多的时候,通常都是epoll_wait立刻就会触发 EPOLLOUT;如果你不调用 epoll,直接写 socket,那么情况就取决于这个socket的缓冲区是不是足够了。如果缓冲区足够,那么写就成功。如果缓冲区不足,那么取决你的socket是不是阻塞的,要么阻塞到写完成,要么出错返回。所以EPOLLOUT事件具有较大的随机性,ET模式一般只用于EPOLLIN, 很少用于EPOLLOUT。
9.2. 具体做法
(1) 主通信线程使用epoll所有需要监控的FD,负责监控listenfd和connfd,这里只监听EPOLLIN事件,不监听EPOLLOUT事件;
(2) 一旦从Client收到了数据以后,将其构造成一个消息,放入消息队列中;
(3) 若干工作线程竞争,从消息队列中取出消息并进行处理,然后把处理结果发送给客户端。发送客户端的操作由工作线程完成。直接进行write。write到EAGAIN或EWOULDBLOCK后,线程循环continue等待缓冲区队列
发送函数代码如下:


bool send_data(int connfd, char *pbuffer, unsigned int &len,int flag)
{
? if ((connfd < 0) || (0? == pbuffer))
? {
? ? ? return false;
? }


? int result = 0;
? int remain_size = (int) len;
? int send_size = 0;
? const char *p = pbuffer;


? time_t start_time = time(NULL);
? int time_out = 3;


? do
? {
? ? ? if (time(NULL) > start + time_out)
? ? ? {
? ? ? ? return false;
? ? ? }


? ? ? send_size = send(connfd, p, remain_size, flag);
? ? ? if (nSentSize < 0)
? ? ? {
? ? ? ? if ((errno == EAGAIN) || (errno == EWOULDBLOCK) || (errno == EINTR))
? ? ? ? {
? ? ? ? ? ? continue;
? ? ? ? }
? ? ? ? else
? ? ? ? {
? ? ? ? ? ? len -= remain_size;
? ? ? ? ? ? return false;
? ? ? ? }
? ? ? }


? ? ? p += send_size;
? ? ? remain_size -= send_size;
? }while(remain_size > 0);


? return true;
}10 epoll 实现服务器和客户端例子
最后我们用C++实现一个简单的客户端回射,所用到的代码文件是


net.h? server.cpp? client.cpp服务器端:epoll实现的,干两件事分别为:1.等待客户端的链接,2.接收来自客户端的数据并且回射;


客户端:select实现,干两件事为:1.等待键盘输入,2.发送数据到服务器端并且接收服务器端回射的数据;


/***********
net.h
***********/
#include


#ifndef _NET_H
#define _NET_H


#include
#include
#include


#include
#include
#include ? //epoll ways file
#include
#include ? ? //block and noblock


#include
#include
#include
#include
#include
#include
#include


using namespace std;



#define hand_error(msg) do{perror(msg); exit(EXIT_FAILURE);}while(0)
#endif



/***********
server.c
***********/
#include "net.h"
#define MAX_EVENTS 10000


int setblock(int sock)
{
? ? int ret =? fcntl(sock, F_SETFL, 0);
? ? if (ret < 0 )
? ? ? ? hand_error("setblock");
? ? return 0;
}
int setnoblock(int sock)? //设置非阻塞模式
{
? ? int ret = fcntl(sock,? F_SETFL, O_NONBLOCK );
? ? if(ret < 0)
? ? ? ? hand_error("setnoblock");
? ? return 0;
}


int main()
{
? ? signal(SIGPIPE,SIG_IGN);
? int listenfd;
? ? listenfd = socket( AF_INET, SOCK_STREAM,0 );? //create a socket stream
? ? if( listenfd < 0 )
? ? ? ? hand_error( "socket_create");
? ? setnoblock(listenfd);
? ? int on = 1;
? ? if( setsockopt( listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))< 0)
? ? ? ? hand_error("setsockopt");


? ? struct sockaddr_in my_addr;
? ? memset(&my_addr, 0, sizeof(my_addr));
? ? my_addr.sin_family = AF_INET;
? ? my_addr.sin_port = htons(18000);? //here is host? sequeue
? ? my_addr.sin_addr.s_addr = inet_addr("127.0.0.1");


? ? if( bind( listenfd, (struct sockaddr *)&my_addr, sizeof(my_addr)) < 0)
? ? ? ? hand_error("bind");


? ? int lisId = listen(listenfd, SOMAXCONN);
? ? if( lisId < 0)? //LISTEN
? ? ? ? hand_error("listen");


? ? struct sockaddr_in peer_addr;? //用来 save client addr
? ? socklen_t peerlen;?
? ? //下面是一些初始化,都是关于epoll的。
? ? vector clients;
? ? int count = 0;
? ? int cli_sock = 0;
? ? int epfd = 0;? //epoll 的文件描述符
? ? int ret_events;? //epoll_wait()的返回值
? struct epoll_event ev_remov, ev, events[MAX_EVENTS];? //events 用来存放从内核读取的的事件
? ? ev.events = EPOLLET | EPOLLIN;? //边缘方式触发
? ? ev.data.fd = listenfd;


? ? epfd = epoll_create(MAX_EVENTS);? //create epoll,返回值为epoll的文件描述符
? ? //epfd = epoll_create(EPOLL_CLOEXEC);? //新版写法
? ? if(epfd < 0)
? ? ? ? hand_error("epoll_create");
? ? int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);? //添加时间
? ? if(ret < 0)
? ? ? ? hand_error("epoll_ctl");



? ? while(1)
? ? {
? ? ? ? ret_events = epoll_wait(epfd, events, MAX_EVENTS, -1);? //类似于select函数,这里是等待事件的到来。
? ? ? ? if(ret_events == -1)
? ? ? ? {
? ? ? ? ? ? cout<<"ret_events = "<? ? ? ? ? ? hand_error("epoll_wait");
? ? ? ? }


? ? ? ? if( ret_events == 0)
? ? ? ? {
? ? ? ? ? ? cout<<"ret_events = "<? ? ? ? ? ? continue;
? ? ? ? }


//? ? ? cout<<"ret_events = "<? ? ? ? for( int num = 0; num < ret_events; num ++)
? ? ? ? {
? ? ? ? ? ? cout<<"num = "<? ? ? ? ? ? cout<<"events[num].data.fd = "<? ? ? ? ? ? if(events[num].data.fd == listenfd) //client connect
? ? ? ? ? ? {
? ? ? ? ? ? ? ? cout<<"listen sucess and listenfd = "<? ? ? ? ? ? ? ? cli_sock = accept(listenfd, (struct sockaddr*)&peer_addr, &peerlen);
? ? ? ? ? ? ? ? if(cli_sock < 0)
? ? ? ? ? ? ? ? ? ? hand_error("accept");
? ? ? ? ? ? ? ? cout<<"count = "<? ? ? ? ? ? ? ? printf("ip=%s,port = %d\n", inet_ntoa(peer_addr.sin_addr),peer_addr.sin_port);
? ? ? ? ? ? ? ? clients.push_back(cli_sock);
? ? ? ? ? ? ? ? setnoblock(cli_sock);? //设置为非阻塞模式
? ? ? ? ? ? ? ? ev.data.fd = cli_sock;// 将新连接也加入EPOLL的监听队列
? ? ? ? ? ? ? ? ev.events = EPOLLIN | EPOLLET ;
? ? ? ? ? ? ? ? if(epoll_ctl(epfd, EPOLL_CTL_ADD, cli_sock, &ev)< 0)
? ? ? ? ? ? ? ? ? ? hand_error("epoll_ctl");
? ? ? ? ? ? }


? ? ? ? ? ? else if( events[num].events & EPOLLIN)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? cli_sock = events[num].data.fd;
? ? ? ? ? ? ? ? if(cli_sock < 0)
? ? ? ? ? ? ? ? ? ? hand_error("cli_sock");
? ? ? ? ? ? ? ? char recvbuf[1024];
? ? ? ? ? ? ? ? memset(recvbuf, 0 , sizeof(recvbuf));
? ? ? ? ? ? ? ? int num = read( cli_sock, recvbuf, sizeof(recvbuf));
? ? ? ? ? ? ? ? if(num == -1)
? ? ? ? ? ? ? ? ? ? hand_error("read have some problem:");
? ? ? ? ? ? ? ? if( num == 0 )? //stand of client have exit
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? cout<<"client have exit"<? ? ? ? ? ? ? ? ? ? close(cli_sock);
? ? ? ? ? ? ? ? ? ? ev_remov = events[num];
? ? ? ? ? ? ? ? ? ? epoll_ctl(epfd, EPOLL_CTL_DEL, cli_sock, &ev_remov);
? ? ? ? ? ? ? ? ? ? clients.erase(remove(clients.begin(), clients.end(), cli_sock),clients.end());
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? fputs(recvbuf,stdout);
? ? ? ? ? ? ? ? write(cli_sock, recvbuf, strlen(recvbuf));
? ? ? ? ? ? }
? ? ? ? }
? ? }


? ? return 0;
}


/***********
client.c
***********/


#include "net.h"


int main()
{
? ? signal(SIGPIPE,SIG_IGN);
? int sock;
? ? sock = socket( AF_INET, SOCK_STREAM,0 );? //create a socket stream
? ? if( sock< 0 )
? ? ? ? hand_error( "socket_create");


? ? struct sockaddr_in my_addr;


? ? //memset my_addr;
? ? memset(&my_addr, 0, sizeof(my_addr));
? ? my_addr.sin_family = AF_INET;
? ? my_addr.sin_port = htons(18000);? //here is host sequeue
//? my_addr.sin_addr.s_addr = htonl( INADDR_ANY );
? ? my_addr.sin_addr.s_addr = inet_addr("127.0.0.1");


? ? int conn = connect(sock, (struct sockaddr *)&my_addr, sizeof(my_addr)) ;
? ? if(conn != 0)
? ? ? ? hand_error("connect");


? ? char recvbuf[1024] = {0};
? ? char sendbuf[1024] = {0};
? ? fd_set rset;
? ? FD_ZERO(&rset);? ?


? ? int nready = 0;
? ? int maxfd;
? ? int stdinof = fileno(stdin);
? ? if( stdinof > sock)
? ? ? ? maxfd = stdinof;
? ? else
? ? ? ? maxfd = sock;
? ? while(1)
? ? {
? ? ? ? //select返回后把原来待检测的但是仍没就绪的描述字清0了。所以每次调用select前都要重新设置一下待检测的描述字
? ? ? ? FD_SET(sock, &rset);?
? ? ? ? FD_SET(stdinof, &rset);
? ? ? ? nready = select(maxfd+1, &rset, NULL, NULL, NULL);
? ? ? ? cout<<"nready = "<? ? ? ? ? ? ? ? else if( ret == 0)
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? cout<<"sever have close"<? ? ? ? ? ? ? ? ? ? close(sock);
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? else
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? fputs(recvbuf,stdout);? ? //输出数据
? ? ? ? ? ? ? ? ? ? memset(recvbuf, 0, strlen(recvbuf));
? ? ? ? ? ? ? ? }?
? ? ? ? ? ? }


? ? ? ? ? ? if( FD_ISSET(stdinof, &rset))? //检测stdin的文件描述符是否在集合里面
? ? ? ? ? ? {?
? ? ? ? ? ? ? ? if(fgets(sendbuf, sizeof(sendbuf), stdin) != NULL)
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? int num = write(sock, sendbuf, strlen(sendbuf));? //写数据
? ? ? ? ? ? ? ? ? ? cout<<"sent num = "<? ? ? ? ? ? ? ? ? ? memset(sendbuf, 0, sizeof(sendbuf));
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? return 0;
}


首页 上一页 1 2 下一页 尾页 2/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇Linux Shell脚本 多线程 下一篇Linux C 源码(nMAsciiHexToBinar..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容: