0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

epoll LT和ET方式下的读写差别

马哥Linux运维 来源:马哥Linux运维 作者:马哥Linux运维 2022-07-07 10:34 次阅读

1f7621d6-fd2c-11ec-ba43-dac502259ad0.png

导语

epoll接口是为解决Linux内核处理大量文件描述符而提出的方案。该接口属于Linux下多路I/O复用接口中select/poll的增强。其经常应用于Linux下高并发服务型程序,特别是在大量并发连接中只有少部分连接处于活跃下的情况 (通常是这种情况),在该情况下能显著的提高程序的CPU利用率。本篇详细解读了epoll的用法,希望大家能有所收获!

正文

设想一个场景:有100万用户同时与一个进程保持着TCP连接,而每一时刻只有几十个或几百个TCP连接是活跃的(接收TCP包),也就是说在每一时刻进程只需要处理这100万连接中的一小部分连接。那么,如何才能高效的处理这种场景呢?进程是否在每次询问操作系统收集有事件发生的TCP连接时,把这100万个连接告诉操作系统,然后由操作系统找出其中有事件发生的几百个连接呢?实际上,在 Linux2.4 版本以前,那时的select 或者 poll 事件驱动方式是这样做的。

这里有个非常明显的问题,即在某一时刻,进程收集有事件的连接时,其实这100万连接中的大部分都是没有事件发生的。因此如果每次收集事件时,都把100万连接的套接字传给操作系统(这首先是用户态内存到内核态内存的大量复制),而由操作系统内核寻找这些连接上有没有未处理的事件,将会是巨大的资源浪费,然后select和poll就是这样做的,因此它们最多只能处理几千个并发连接。而epoll不这样做,它在Linux内核中申请了一个简易的文件系统,把原先的一个select或poll调用分成了3部分:

intepoll_create(intsize);
intepoll_ctl(intepfd,intop,intfd,structepoll_event*event);
intepoll_wait(intepfd,structepoll_event*events,intmaxevents,inttimeout);

调用 epoll_create 建立一个 epoll 对象(在epoll文件系统中给这个句柄分配资源);

调用 epoll_ctl 向 epoll 对象中添加这100万个连接的套接字;

调用 epoll_wait 收集发生事件的连接。

这样只需要在进程启动时建立 1 个 epoll 对象,并在需要的时候向它添加或删除连接就可以了,因此,在实际收集事件时,epoll_wait 的效率就会非常高,因为调用 epoll_wait 时并没有向它传递这100万个连接,内核也不需要去遍历全部的连接。

一、epoll原理详解

当某一进程调用 epoll_create 方法时,Linux 内核会创建一个 eventpoll 结构体,这个结构体中有两个成员与epoll的使用方式密切相关,如下所示:

structeventpoll{
...
/*红黑树的根节点,这棵树中存储着所有添加到epoll中的事件,
也就是这个epoll监控的事件*/
structrb_rootrbr;
/*双向链表rdllist保存着将要通过epoll_wait返回给用户的、满足条件的事件*/
structlist_headrdllist;
...
};

我们在调用 epoll_create 时,内核除了帮我们在 epoll 文件系统里建了个 file 结点,在内核 cache 里建了个红黑树用于存储以后 epoll_ctl 传来的 socket 外,还会再建立一个 rdllist 双向链表,用于存储准备就绪的事件,当 epoll_wait 调用时,仅仅观察这个 rdllist 双向链表里有没有数据即可。有数据就返回,没有数据就sleep,等到 timeout 时间到后即使链表没数据也返回。所以epoll_wait 非常高效。

所有添加到epoll中的事件都会与设备(如网卡)驱动程序建立回调关系,也就是说相应事件的发生时会调用这里的回调方法。这个回调方法在内核中叫做ep_poll_callback,它会把这样的事件放到上面的rdllist双向链表中。

在epoll中对于每一个事件都会建立一个epitem结构体,如下所示:

structepitem{
...
//红黑树节点
structrb_noderbn;
//双向链表节点
structlist_headrdllink;
//事件句柄等信息
structepoll_filefdffd;
//指向其所属的eventepoll对象
structeventpoll*ep;
//期待的事件类型
structepoll_eventevent;
...
};//这里包含每一个事件对应着的信息。

当调用 epoll_wait 检查是否有发生事件的连接时,只是检查eventpoll对象中的rdllist双向链表是否有epitem元素而已,如果rdllist链表不为空,则这里的事件复制到用户态内存(使用共享内存提高效率)中,同时将事件数量返回给用户。因此epoll_waitx效率非常高。epoll_ctl在向epoll对象中添加、修改、删除事件时,从rbr红黑树中查找事件也非常快,也就是说epoll是非常高效的,它可以轻易地处理百万级别的并发连接。

1f8702ee-fd2c-11ec-ba43-dac502259ad0.png

【总结】

一颗红黑树,一张准备就绪句柄链表,少量的内核cache,就帮我们解决了大并发下的socket处理问题.

执行epoll_create() 时,创建了红黑树和就绪链表;

执行 epoll_ctl() 时,如果增加 socket 句柄,则检查在红黑树中是否存在,存在立即返回,不存在则添加到树干上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪链表中插入数据;

执行 epoll_wait() 时立刻返回准备就绪链表里的数据即可。

1fa9e318-fd2c-11ec-ba43-dac502259ad0.png

二、epoll 的两种触发模式

epoll有EPOLLLT和EPOLLET两种触发模式,LT是默认的模式,ET是“高速”模式。

LT(水平触发)模式下,只要这个文件描述符还有数据可读,每次 epoll_wait都会返回它的事件,提醒用户程序去操作;

ET(边缘触发)模式下,在它检测到有 I/O 事件时,通过 epoll_wait 调用会得到有事件通知的文件描述符,对于每一个被通知的文件描述符,如可读,则必须将该文件描述符一直读到空,让 errno 返回 EAGAIN 为止,否则下次的 epoll_wait 不会返回余下的数据,会丢掉事件。

如果ET模式不是非阻塞的,那这个一直读或一直写势必会在最后一次阻塞。

还有一个特点是,epoll使用“事件”的就绪通知方式,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait便可以收到通知。

1fbdc996-fd2c-11ec-ba43-dac502259ad0.png

【epoll为什么要有ET触发模式?】

如果采用 EPOLLLT 模式的话,系统中一旦有大量你不需要读写的就绪文件描述符,它们每次调用epoll_wait都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率.。而采用EPOLLET这种边缘触发模式的话,当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你!!!这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符。

【总结】

ET模式(边缘触发)

只有数据到来才触发,不管缓存区中是否还有数据,缓冲区剩余未读尽的数据不会导致epoll_wait返回;

边沿触发模式很大程度上降低了同一个epoll事件被重复触发的次数,所以效率更高;

对于读写的connfd,边缘触发模式下,必须使用非阻塞IO,并要一次性全部读写完数据。

ET的编程可以做到更加简洁,某些场景下更加高效,但另一方面容易遗漏事件,容易产生bug;

LT 模式(水平触发,默认)

只要有数据都会触发,缓冲区剩余未读尽的数据会导致epoll_wait返回;

LT比ET多了一个开关EPOLLOUT事件(系统调用消耗,上下文切换)的步骤;

对于监听的sockfd,最好使用水平触发模式(参考nginx),边缘触发模式会导致高并发情况下,有的客户端会连接不上,LT适合处理紧急事件;

对于读写的connfd,水平触发模式下,阻塞和非阻塞效果都一样,不过为了防止特殊情况,还是建议设置非阻塞;

LT的编程与poll/select接近,符合一直以来的习惯,不易出错;

总之,各有优缺点,需要根据业务场景选择最合适的模式。

三、epoll反应堆模型

【epoll模型原来的流程】

epoll_create();//创建监听红黑树
epoll_ctl();//向书上添加监听fd
epoll_wait();//监听
有监听fd事件发送--->返回监听满足数组--->判断返回数组元素--->
lfd满足accept--->返回cfd---->read()读数据--->write()给客户端回应。

【epoll反应堆模型的流程】

epoll_create();//创建监听红黑树
epoll_ctl();//向书上添加监听fd
epoll_wait();//监听
有客户端连接上来--->lfd调用acceptconn()--->将cfd挂载到红黑树上监听其读事件--->
epoll_wait()返回cfd--->cfd回调recvdata()--->将cfd摘下来监听写事件--->
epoll_wait()返回cfd--->cfd回调senddata()--->将cfd摘下来监听读事件--->...--->
1fdcc8d2-fd2c-11ec-ba43-dac502259ad0.png

【Demo】

#include#include
#include
#include
#include#include#include#include#include#include#defineMAX_EVENTS1024/*监听上限*/
#defineBUFLEN4096/*缓存区大小*/
#defineSERV_PORT6666/*端口号*/

voidrecvdata(intfd,intevents,void*arg);
voidsenddata(intfd,intevents,void*arg);

/*描述就绪文件描述符的相关信息*/
structmyevent_s
{
intfd;//要监听的文件描述符
intevents;//对应的监听事件,EPOLLIN和EPLLOUT
void*arg;//指向自己结构体指针
void(*call_back)(intfd,intevents,void*arg);//回调函数
intstatus;//是否在监听:1->在红黑树上(监听),0->不在(不监听)
charbuf[BUFLEN];
intlen;
longlast_active;//记录每次加入红黑树g_efd的时间值
};

intg_efd;//全局变量,作为红黑树根
structmyevent_sg_events[MAX_EVENTS+1];//自定义结构体类型数组.+1-->listenfd

/*
*封装一个自定义事件,包括fd,这个fd的回调函数,还有一个额外的参数项
*注意:在封装这个事件的时候,为这个事件指明了回调函数,一般来说,一个fd只对一个特定的事件
*感兴趣,当这个事件发生的时候,就调用这个回调函数
*/
voideventset(structmyevent_s*ev,intfd,void(*call_back)(intfd,intevents,void*arg),void*arg)
{
ev->fd=fd;
ev->call_back=call_back;
ev->events=0;
ev->arg=arg;
ev->status=0;
if(ev->len<= 0)
    {
        memset(ev->buf,0,sizeof(ev->buf));
ev->len=0;
}
ev->last_active=time(NULL);//调用eventset函数的时间
return;
}

/*向epoll监听的红黑树添加一个文件描述符*/
voideventadd(intefd,intevents,structmyevent_s*ev)
{
structepoll_eventepv={0,{0}};
intop=0;
epv.data.ptr=ev;//ptr指向一个结构体(之前的epoll模型红黑树上挂载的是文件描述符cfd和lfd,现在是ptr指针)
epv.events=ev->events=events;//EPOLLIN或EPOLLOUT
if(ev->status==0)//status说明文件描述符是否在红黑树上0不在,1在
{
op=EPOLL_CTL_ADD;//将其加入红黑树g_efd,并将status置1
ev->status=1;
}
if(epoll_ctl(efd,op,ev->fd,&epv)< 0) // 添加一个节点
        printf("event add failed [fd=%d],events[%d]
", ev->fd,events);
else
printf("eventaddOK[fd=%d],events[%0X]
",ev->fd,events);
return;
}

/*从epoll监听的红黑树中删除一个文件描述符*/
voideventdel(intefd,structmyevent_s*ev)
{
structepoll_eventepv={0,{0}};
if(ev->status!=1)//如果fd没有添加到监听树上,就不用删除,直接返回
return;
epv.data.ptr=NULL;
ev->status=0;
epoll_ctl(efd,EPOLL_CTL_DEL,ev->fd,&epv);
return;
}

/*当有文件描述符就绪,epoll返回,调用该函数与客户端建立链接*/
voidacceptconn(intlfd,intevents,void*arg)
{
structsockaddr_incin;
socklen_tlen=sizeof(cin);
intcfd,i;
if((cfd=accept(lfd,(structsockaddr*)&cin,&len))==-1)
{
if(errno!=EAGAIN&&errno!=EINTR)
{
sleep(1);
}
printf("%s:accept,%s
",__func__,strerror(errno));
return;
}
do
{
for(i=0;i< MAX_EVENTS; i++) //从全局数组g_events中找一个空闲元素,类似于select中找值为-1的元素
        {
            if(g_events[i].status ==0)
                break;
        }
        if(i == MAX_EVENTS) // 超出连接数上限
        {
            printf("%s: max connect limit[%d]
", __func__, MAX_EVENTS);
            break;
        }
        int flag = 0;
        if((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0) //将cfd也设置为非阻塞
        {
            printf("%s: fcntl nonblocking failed, %s
", __func__, strerror(errno));
            break;
        }
        eventset(&g_events[i], cfd, recvdata, &g_events[i]); //找到合适的节点之后,将其添加到监听树中,并监听读事件
        eventadd(g_efd, EPOLLIN, &g_events[i]);
    }while(0);

    printf("new connect[%s:%d],[time:%ld],pos[%d]",inet_ntoa(cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i);
    return;
}

/*读取客户端发过来的数据的函数*/
void recvdata(int fd, int events, void *arg)
{
    struct myevent_s *ev = (struct myevent_s *)arg;
    int len;

    len = recv(fd, ev->buf,sizeof(ev->buf),0);//读取客户端发过来的数据

eventdel(g_efd,ev);//将该节点从红黑树上摘除

if(len>0)
{
ev->len=len;
ev->buf[len]='�';//手动添加字符串结束标记
printf("C[%d]:%s
",fd,ev->buf);

eventset(ev,fd,senddata,ev);//设置该fd对应的回调函数为senddata
eventadd(g_efd,EPOLLOUT,ev);//将fd加入红黑树g_efd中,监听其写事件

}
elseif(len==0)
{
close(ev->fd);
/*ev-g_events地址相减得到偏移元素位置*/
printf("[fd=%d]pos[%ld],closed
",fd,ev-g_events);
}
else
{
close(ev->fd);
printf("recv[fd=%d]error[%d]:%s
",fd,errno,strerror(errno));
}
return;
}

/*发送给客户端数据*/
voidsenddata(intfd,intevents,void*arg)
{
structmyevent_s*ev=(structmyevent_s*)arg;
intlen;

len=send(fd,ev->buf,ev->len,0);//直接将数据回射给客户端

eventdel(g_efd,ev);//从红黑树g_efd中移除

if(len>0)
{
printf("send[fd=%d],[%d]%s
",fd,len,ev->buf);
eventset(ev,fd,recvdata,ev);//将该fd的回调函数改为recvdata
eventadd(g_efd,EPOLLIN,ev);//重新添加到红黑树上,设为监听读事件
}
else
{
close(ev->fd);//关闭链接
printf("send[fd=%d]error%s
",fd,strerror(errno));
}
return;
}

/*创建socket,初始化lfd*/

voidinitlistensocket(intefd,shortport)
{
structsockaddr_insin;

intlfd=socket(AF_INET,SOCK_STREAM,0);
fcntl(lfd,F_SETFL,O_NONBLOCK);//将socket设为非阻塞

memset(&sin,0,sizeof(sin));//bzero(&sin,sizeof(sin))
sin.sin_family=AF_INET;
sin.sin_addr.s_addr=INADDR_ANY;
sin.sin_port=htons(port);

bind(lfd,(structsockaddr*)&sin,sizeof(sin));

listen(lfd,20);

/*voideventset(structmyevent_s*ev,intfd,void(*call_back)(int,int,void*),void*arg);*/
eventset(&g_events[MAX_EVENTS],lfd,acceptconn,&g_events[MAX_EVENTS]);

/*voideventadd(intefd,intevents,structmyevent_s*ev)*/
eventadd(efd,EPOLLIN,&g_events[MAX_EVENTS]);//将lfd添加到监听树上,监听读事件

return;
}

intmain()
{
intport=SERV_PORT;

g_efd=epoll_create(MAX_EVENTS+1);//创建红黑树,返回给全局g_efd
if(g_efd<= 0)
            printf("create efd in %s err %s
", __func__, strerror(errno));

    initlistensocket(g_efd, port); //初始化监听socket

    struct epoll_event events[MAX_EVENTS + 1];  //定义这个结构体数组,用来接收epoll_wait传出的满足监听事件的fd结构体
    printf("server running:port[%d]
", port);

    int checkpos = 0;
    int i;
    while(1)
    {
    /*    long now = time(NULL);
        for(i=0; i < 100; i++, checkpos++)
        {
            if(checkpos == MAX_EVENTS);
                checkpos = 0;
            if(g_events[checkpos].status != 1)
                continue;
            long duration = now -g_events[checkpos].last_active;
            if(duration >=60)
{
close(g_events[checkpos].fd);
printf("[fd=%d]timeout
",g_events[checkpos].fd);
eventdel(g_efd,&g_events[checkpos]);
}
}*/
//调用eppoll_wait等待接入的客户端事件,epoll_wait传出的是满足监听条件的那些fd的structepoll_event类型
intnfd=epoll_wait(g_efd,events,MAX_EVENTS+1,1000);
if(nfd< 0)
        {
            printf("epoll_wait error, exit
");
            exit(-1);
        }
        for(i = 0; i < nfd; i++)
        {
            //evtAdd()函数中,添加到监听树中监听事件的时候将myevents_t结构体类型给了ptr指针
            //这里epoll_wait返回的时候,同样会返回对应fd的myevents_t类型的指针
            struct myevent_s *ev = (struct myevent_s *)events[i].data.ptr;
            //如果监听的是读事件,并返回的是读事件
            if((events[i].events & EPOLLIN) &&(ev->events&EPOLLIN))
{
ev->call_back(ev->fd,events[i].events,ev->arg);
}
//如果监听的是写事件,并返回的是写事件
if((events[i].events&EPOLLOUT)&&(ev->events&EPOLLOUT))
{
ev->call_back(ev->fd,events[i].events,ev->arg);
}
}
}
return0;
}

最后学习epoll一些建议

需要深入理解epoll LT和ET方式下的读写差别,怎么优雅地处理各种错误;

需要关注多线程负载均衡,惊群效应等问题,要用 epoll 实现负载均衡并且避免数据竞争,必须掌握好 EPOLLONESHOT 和 EPOLLEXCLUSIVE 这两个标志;

理解epoll不足之处:

定时的精度不够,只到5ms级别,select可以到0.1ms;

当连接数少并且连接都十分活跃的情况下,select和poll的性能可能比epoll好;

epoll_ctrl每次只能够修改一个fd(kevent可以一次改多个,每次修改,epoll需要一个系统调用,不能 batch 操作,可能会影响性能)。

可能会在定时到期之前返回,导致还需要下一个epoll_wait调用。

原文标题:揭秘Linux高性能服务epoll 的本质

文章出处:【微信公众号:马哥Linux运维】欢迎添加关注!文章转载请注明出处。

审核编辑:彭静

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 接口
    +关注

    关注

    33

    文章

    7640

    浏览量

    148512
  • 操作系统
    +关注

    关注

    37

    文章

    6288

    浏览量

    121896
  • epoll
    +关注

    关注

    0

    文章

    28

    浏览量

    2913

原文标题:揭秘Linux高性能服务epoll 的本质

文章出处:【微信号:magedu-Linux,微信公众号:马哥Linux运维】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    epoll的使用

    &lt; 0){perror("fail to epoll!");return -1;}listenfd = tcp_create_socket();fcntl(listenfd
    发表于 05-11 13:22

    我读过的最好的epoll讲解

    出数据,或者写入数据的流,对他们进行操作。但是使用select,我们有O(n)的无差别轮询复杂度,同时处理的流越多,每一次无差别轮询时间就越长。说了这么多,终于能好好解释epollepol
    发表于 05-12 15:30

    epoll使用方法与poll的区别

    因为epoll的触发机制是在内核中直接完成整个功能 那个事件准备就绪我就直接返回这个IO事件
    发表于 07-31 10:03

    epoll_wait的事件返回的fd为错误是怎么回事?

    netlink 的 socket 连接 的 fd 为18,但是添加到epollepoll_wait()返回的fd 为 0为什么会出现这样的现象?补充 说明:1、 epoll_wait返回
    发表于 06-12 09:03

    揭示EPOLL一些原理性的东西

    越多,没一次无差别轮询时间就越长。再次说了这么多,终于能好好解释epollepoll可以理解为event poll,不同于忙轮询和无差别轮询,ep
    发表于 08-24 16:32

    【米尔王牌产品MYD-Y6ULX-V2开发板试用体验】socket通信和epoll

    ;gt;#include &lt;sys/epoll.h>#include "ssd1306.h"const int PORT = 8888
    发表于 11-10 15:31

    epoll和select的区别

     select,epoll都是IO多路复用的机制。I/O多路复用就通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select
    发表于 11-10 16:20 1.9w次阅读
    <b class='flag-5'>epoll</b>和select的区别

    poll&&epollepoll实现

    poll&&epollepoll实现
    发表于 05-14 14:34 2644次阅读
    poll&&<b class='flag-5'>epoll</b>之<b class='flag-5'>epoll</b>实现

    一文详解epoll的实现原理

    本文以四个方面介绍epoll的实现原理,1.epoll的数据结构;2.协议栈如何与epoll通信;3.epoll线程安全如何加锁;4.ET
    的头像 发表于 08-01 13:28 3476次阅读

    epoll来实现多路复用

    本人用epoll来实现多路复用,epoll触发模式有两种: ET(边缘模式) LT(水平模式) LT模式 是标准模式,意味着每次
    的头像 发表于 11-09 10:15 215次阅读
    用<b class='flag-5'>epoll</b>来实现多路复用

    epoll 的实现原理

    今儿我们就从源码入手,来帮助大家简单理解一下 epoll 的实现原理,并在后边分析一下,大家都说 epoll 性能好,那到底是好在哪里。 epoll 简介 1、epoll 的简单使用
    的头像 发表于 11-09 11:14 236次阅读
    <b class='flag-5'>epoll</b> 的实现原理

    epoll的基础数据结构

    一、epoll的基础数据结构 在开始研究源代码之前,我们先看一下 epoll 中使用的数据结构,分别是 eventpoll、epitem 和 eppoll_entry。 1、eventpoll 我们
    的头像 发表于 11-10 10:20 336次阅读
    <b class='flag-5'>epoll</b>的基础数据结构

    epoll的触发模式介绍

    前言 epoll的触发模式是个引发讨论非常多的话题,网络上这方面总结的文章也很多,首先从名字上就不是很统一,LT模式常被称为水平触发、电平触发、条件触发,而ET模式常被称为边缘触发、边沿触发等,这些
    的头像 发表于 11-10 14:54 340次阅读

    epollLT模式总结

    epoll的触发模式是个引发讨论非常多的话题,网络上这方面总结的文章也很多,首先从名字上就不是很统一,LT模式常被称为水平触发、电平触发、条件触发,而ET模式常被称为边缘触发、边沿触发等,这些都是
    的头像 发表于 11-10 15:35 244次阅读

    epoll源码分析

    Linux内核提供了3个关键函数供用户来操作epoll,分别是: epoll_create(), 创建eventpoll对象 epoll_ctl(), 操作eventpoll对象
    的头像 发表于 11-13 11:49 523次阅读
    <b class='flag-5'>epoll</b>源码分析