EventLoop 类图 及成员 : Channel , Poller , EventLoop
muduo事件通知机制 : eventfd
EventLoop
工作流程大致如下
EventLoop :
- Reactor(mainReactor和subReactor)的核心!!!
主要包含
- Channel
- Poller – epoll的抽象
Channel
实际使用中可分为三种channel : acceptChannel(listengingfd) , connfdChannel,(connfd) wakeupChannel(wakeupFd)
Channel 封装
- 管理的fd
- 挂在epoll tree上的fd的事件 如fd的EPOLLIN、EPOLLOUT
- poller实际监听到的事件
- EventHandler
重要成员如下
1
2
3
4
5
6
7
8
9EventLoop *loop_; // EventLoop channel归属于哪个EventLoop 为了获取poller
const int fd_; // fd
int events_; // fd感兴趣的事件的集合(注册在epoll tree上要被监听的事件的集合)
int revents_; // poller 实际监听到的事件
std::weak_ptr<void> tie_; // 防止channel调用回调时 TcpConnection被销毁。
ReadEventCallback readCallback_; // 读回调
EventCallback writeCallback_; // 写回调
EventCallback closeCallback_; // 连接关闭回调
EventCallback errorCallback_; // error回调重要方法如下
update:将fd及evens_注册在epoll Tree上
- Channel enable/disable -> update -> EventLoop -> Poller -> epoll_ctl
1
2
3
4
5void Channel::update()
{
loop_->updateChannel(this);
// Eventloop -> Poller -> epoll_ctl 更新fd注册在事件表上的event
}
- Channel enable/disable -> update -> EventLoop -> Poller -> epoll_ctl
handleEvent:根据revents,调用相应callback
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17void Channel::handleEvent(const Timestamp& receiveTime)
{
// ...
handlerEventWithGuard(receiveTime);
}
// 根据poller通知的channel发生的具体事件,由channel负责调用具体的回调操作
void Channel::handlerEventWithGuard(const Timestamp& receiveTime)
{
// 大致如下
// 对端关闭
if(!(revents_ & EPOLLIN) && (revents_ & EPOLLHUP))
closeCallback_();
// 普通的读事件
if(revents_ & (EPOLLIN | EPOLLPRI))
readCallback();
}setXXXCalllback
enable/disable Reading/Writing…
- 注册到poller上 / 从poller上移除
Poller
Poller:封装 event-based核心的IO复用(epoll_开头的相关)函数 的模块。
epoll为例子
重要成员
- epollfd_ : epfd
- events_ : 存储发生事件的集合。传递给epoll
- ownerLoop_ : 所属eventLoop
- channels<fd,Channel*> : 记录注册在poller上的fd以及channel。用于将监听的channel 和 记录的channel 进行检验。
重要方法
poll:
- epoll_wait
- 并将fillActiveChannels,将发生的事件传递给ownerLoop_
- 适时扩容events_
1
2
3
4
5
6
7
8
9
10
11
12// 核心代码如下
Timestamp EpollPoller::poll(int timeoutMs,ChannelList *activeChannels)
{
// epoll_wait
// epoll_event[] 传入传出参数 用来存内核得到事件的集合,
// maxevents epoll总共需要监听多少个events
int numEvents = epoll_wait(epollfd_ , events_.begin().base() , static_cast<int>(events_.size()) , timeoutMs);
fillActiveChannels(numEvents,activeChannels);
return Timestamp::now();
}
removeChannel/updateChannel : 将事件注册/更改/移除
EventLoop 核心逻辑
EventLoop 是 Reactor的核心,是event-based模型的核心。
- 作用:循环 监听事件,获取活跃事件,并且根据发生events,来handleEvent
- EventLoop 通过调用 Poller 和 Channel 并协作。
- Poller: 监听并获取发生事件 上传给EventLoop(activeChannelList)
- Channel: 封装fd以及相应event以及相应callback。EventLoop根据Poller上传的channel 来调用handleEvent 处理事件。
EventLoop 最重要的函数!!: loop : 核心逻辑如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// loop -> epoll_wait -> handleEvent(IO线程监听到的event) -> doPendingFunctor()(其他worker线程加入的event cb)
void EventLoop::loop()
{
// 保证是eventloop所属的thread使用该eventloop的loop方法
assertInLoopThread();
while(!quit_)
{
activeChannels_.clear();
// 监听两类fd
// 一种是 listenfd / connfd
// 一种是 **wakeupfd**
pollReturnTime_ = poller_->poll(kPollerTimeMs,&activeChannels_);
// 对于mainReactor的IO loop , 负责监听listenfd , handler : 接收新来的连接 并将新来的连接派发给subReactor(其他thread的eventloop)
// 对于subReactor的IO loop , 负责监听connfd , handler : 处理 已连接socket上发生的事件.可以在subreactor的ioloop中处理 也可将任务打包提交给线程池处理
for(Channel *channel:activeChannels_)
{
channel->handleEvent(pollReturnTime_);
}
// 执行添加给本eventLoop的回调操作(可能是本thread或其他thread)
doPendingFunctors();
}
}
One loop per thread
- one loop per thread : 为了充分利用多核cpu.每一个核的cpu上跑一个Loop的Thread。
事件通知机制
eventfd
eventfd : 可用于唤醒等待中的epoll_wait,select等. 以此来作为一种线程间的事件通知机制
- thread A处于epoll_wait. 且eventfd注册在上面.
- thread B写eventfd, 唤醒thread A的epoll_wait. 让thread A向下执行,处理事务.
函数如下
- int eventfd(unsigned int initval, int flags);
- eventfd()创建了一个 eventfd object , 其中拥有着一个64bit的整形计数器. 该计数器中的值为user传入的initval
- read : 成功读取返回一个8byte的整数。read(2)如果提供的缓冲区的大小小于8个字节返回错误EINVAL
- write将缓冲区写入的8字节整形值加到内核计数器上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
using namespace std;
void threadFunc(int eventFd)
{
sleep(5);
uint64_t buf = 999;
ssize_t n = write(eventFd,&buf,sizeof buf);
if(n!=sizeof buf)
{
cout<<"write "<<n<<" bytes instead of 8"<<endl;
exit(-1);
}
}
int main()
{
int eventFd = eventfd(0,EFD_NONBLOCK | EFD_CLOEXEC);
cout<<"eventFd : "<<eventFd<<endl;
int epfd = epoll_create(1);
struct epoll_event events[5];
{
struct epoll_event event;
event.data.fd = eventFd;
event.events = EPOLLIN;
epoll_ctl(epfd,EPOLL_CTL_ADD,eventFd,&event);
}
std::thread t(threadFunc,eventFd);
while(true)
{
int numEvents = epoll_wait(epfd,events,5,1000);
if(numEvents == 0)
{
cout<<"timeout"<<endl;
continue;
}
assert(numEvents == 1);
assert(events[0].data.fd == eventFd);
uint64_t buf = 0;
unsigned long n = read(eventFd,&buf,sizeof buf);
if(n!=sizeof buf)
{
cout<<"read "<<n<<" bytes instead of 8"<<endl;
exit(-1);
}
else
{
cout<<"read "<<n<<" bytes "<<buf<<endl;
}
break;
}
t.join();
close(eventFd);
cout<<"finished"<<endl;
return 0;
}
shc@DESKTOP-TVUERHD:~/Code/try$ ./eventFd
eventFd : 3
timeout
timeout
timeout
timeout
read 8 bytes 999
finished
wakeupFd
eventfd封装
int wakeupFd_; 主要作用:当mainLoop获取一个新用户的channel,通过轮询算法选择一个subLoop 通过wakeupFd唤醒subLoop处理channel
wakeup : EventLoop提供给外界唤醒其epoll_wait的
1
2
3
4
5
6
7
8
9
10
11
12// 用来唤醒loop所在的(所属于的)线程(IO线程)。向wakeupFd_写一个数据。
// wakeupChannel就发生读事件 当前loop线程就会被唤醒。
void EventLoop::wakeup()
{
LOG_DEBUG("wakeup");
uint64_t one = 1;
ssize_t n = write(wakeupFd_,&one,sizeof one);
if(n != sizeof one)
{
LOG_ERROR("EventLoop:wake() writes %lu bytes instead of 8\n",n);
}
}Eventloop处理eventFd的读事件. 防止LT下busyloop.
1
2
3
4
5
6
7
8
9
10void EventLoop::handleWakeUpFd(const Timestamp& )
{
LOG_DEBUG("handle wakeupFd\n");
uint64_t one = 1;
ssize_t n = read(wakeupFd_,&one,sizeof one);
if(n!=sizeof one)
{
LOG_ERROR("EventLoop::handleRead() reads %ld bytes instead of 8\n",n);
}
}
分发连接
问题:mainReactor如何分发连接 ? 当mainReactor上的Acceptor建立新的连接时,如何将该连接(TCPConnection(connfd))的读写事件注册给subReactor的eventloop中的poller? 让subReactor负责该connfd的所有读写的监听以及回调.
- 这也就是所谓的muduo的事件通知机制.
- 通过eventfd(wakeupFd_).统一事件源
事件分发机制概述:每个eventloop都有一个eventfd,其注册在自己loop的epoll上. 外界在想唤醒eventloop,让他处理事件的时候,就向eventloop放入处理事件的callback,然后通过wakeup()向eventFd写8bytes,将eventloope从poll_wait唤醒,处理放入的callback.
那么回到问题.mainReactor如何分发连接 ?
- 连接建立后,mainReactor上的acceptor调用TcpServer::newConnection,为connfd指定eventloop(subloop),(TCPConnection)设置读写回调. 然后调用subloop的runInLoop->queueInLoop->wakeup(), subloop从epoll_wait中离开,执行connectionEstablished.
- queueInLoop会将TcpConnection::connectionEstablished交给该eventloop. 该callback就是将connfd注册到该loop的poller
- 到此,分发完成.
- 之后该eventloop负责该connfd的所有读写事件.
- 连接建立后,mainReactor上的acceptor调用TcpServer::newConnection,为connfd指定eventloop(subloop),(TCPConnection)设置读写回调. 然后调用subloop的runInLoop->queueInLoop->wakeup(), subloop从epoll_wait中离开,执行connectionEstablished.
code
- mainReactor(main thread)分发连接代码
- mainReactor的Acceptor
1
2
3
4
5
6
7
8
9
10
11
12// 有一个新的客户端的连接,eventloop 会调用acceptor的handleConnection handleConnection会执行这个回调操作
void TcpServer::newConnection(int connfd, const InetAddress &peerAddr, const InetAddress &localAddr)
{
LOG_DEBUG("TcpServer::newConnection [%s] - new connection [%s] from %s \n",
name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());
// mainReactor的Acceptor为connfd指定eventloop
EventLoop *ioLoop = threadPool_->getNextLoop();
TcpConnection conn(ioLoop,connfd)
conn->setUserCallback(...);
// 调用eventloop的runInLoop -> queueInLoop 分发连接
ioLoop->runInLoop(std::bind(&TcpConnection::connectionEstablished, conn));
} - mainReactor调用subReactor的eventloop的runInLoop
- 将connectionEstablished放入eventloop(pendingFunctor)
- wakeup唤醒eventloop
1
2
3
4
5
6
7
8
9
10// 把cb放入队列 (wakeupfd唤醒loop所在线程 执行cb)
void EventLoop::queueInLoop(const Functor& cb)
{
pendingFunctors_.emplace_back(cb);
if(!isInLoopThread())
{
// main thread 通过loop对象wakeup方法 唤醒卡在epoll wait的eventloop
wakeup();
}
}
- subReactor的Eventloop被唤醒。在doPendingFunctor中通过TcpConnection::connectionEstablished真正实现分发连接. 将该connfd的读写事件注册到本channel所属TCPConnection的loop的poller上(每个loop都有一个poller channel只要知道自己是哪个loop就能找到对应的poller.)
1
2
3
4
5void TcpConnection::connectionEstablished()
{
// 注册! connfd 到 loop's poller!
channel_->enableReading();
}
- mainReactor的Acceptor
小结
一言以bi之:
- mainReactor在建立新连接后,通过写eventFd唤醒subReactor. subReactor将connfd注册在其epoll上,之后负责监听处理connfd所有的读写事件.
所谓统一事件源:
subReactor要处理的事件只有监听connfd的读写,并不负责建立新连接. 试想(我猜的)如果没有eventfd. 那么 mainReactor接收新连接后要实现分:就只能通过信号通知subReactor,亦或者是放入一个任务队列 ,subReactor定期从中拿取新连接. 都过于复杂或者性能低.
而有了eventFd,mainReactor可以向其中写来唤醒subReactor. 从subReactor的角度来看也是将事件 : 监听新connfd 转化成了一个 eventfd的读事件. 也是通过epoll监听. 和其他正常connfd的读写一样.