不落辰

知不可乎骤得,托遗响于悲风

0%

muduo-EventLoop

EventLoop 类图 及成员 : Channel , Poller , EventLoop
muduo事件通知机制 : eventfd

EventLoop

  • 工作流程大致如下

  • EventLoop :

    • Reactor(mainReactor和subReactor)的核心!!!
  • 主要包含

    • Channel
    • Poller – epoll的抽象

Channel

  • 实际使用中可分为三种channel : acceptChannel(listengingfd) , connfdChannel,(connfd) wakeupChannel(wakeupFd)

  • Channel 封装

    • 管理的fd
    • 挂在epoll tree上的fd的事件 如fd的EPOLLIN、EPOLLOUT
    • poller实际监听到的事件
    • EventHandler
  • 重要成员如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    EventLoop *loop_; //  EventLoop channel归属于哪个EventLoop 为了获取poller
    const int fd_; // fd
    int events_; // fd感兴趣的事件的集合(注册在epoll tree上要被监听的事件的集合)
    int revents_; // poller 实际监听到的事件
    std::weak_ptr<void> tie_; // 防止channel调用回调时 TcpConnection被销毁。
    ReadEventCallback readCallback_; // 读回调
    EventCallback writeCallback_; // 写回调
    EventCallback closeCallback_; // 连接关闭回调
    EventCallback errorCallback_; // error回调
  • 重要方法如下

  • update:将fd及evens_注册在epoll Tree上

    • Channel enable/disable -> update -> EventLoop -> Poller -> epoll_ctl
      1
      2
      3
      4
      5
      void Channel::update()
      {
      loop_->updateChannel(this);
      // Eventloop -> Poller -> epoll_ctl 更新fd注册在事件表上的event
      }
  • handleEvent:根据revents,调用相应callback

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    void Channel::handleEvent(const Timestamp& receiveTime)
    {
    // ...
    handlerEventWithGuard(receiveTime);
    }

    // 根据poller通知的channel发生的具体事件,由channel负责调用具体的回调操作
    void Channel::handlerEventWithGuard(const Timestamp& receiveTime)
    {
    // 大致如下
    // 对端关闭
    if(!(revents_ & EPOLLIN) && (revents_ & EPOLLHUP))
    closeCallback_();
    // 普通的读事件
    if(revents_ & (EPOLLIN | EPOLLPRI))
    readCallback();
    }
  • setXXXCalllback

  • enable/disable Reading/Writing

    • 注册到poller上 / 从poller上移除

Poller

  • Poller:封装 event-based核心的IO复用(epoll_开头的相关)函数 的模块。

  • epoll为例子

  • 重要成员

    • epollfd_ : epfd
    • events_ : 存储发生事件的集合。传递给epoll
    • ownerLoop_ : 所属eventLoop
    • channels<fd,Channel*> : 记录注册在poller上的fd以及channel。用于将监听的channel 和 记录的channel 进行检验。
  • 重要方法

  • poll

    • epoll_wait
    • 并将fillActiveChannels,将发生的事件传递给ownerLoop_
    • 适时扩容events_
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      //  核心代码如下
      Timestamp EpollPoller::poll(int timeoutMs,ChannelList *activeChannels)
      {
      // epoll_wait
      // epoll_event[] 传入传出参数 用来存内核得到事件的集合,
      // maxevents epoll总共需要监听多少个events
      int numEvents = epoll_wait(epollfd_ , events_.begin().base() , static_cast<int>(events_.size()) , timeoutMs);

      fillActiveChannels(numEvents,activeChannels);

      return Timestamp::now();
      }
  • removeChannel/updateChannel : 将事件注册/更改/移除

EventLoop 核心逻辑

  • EventLoop 是 Reactor的核心,是event-based模型的核心。

    • 作用:循环 监听事件,获取活跃事件,并且根据发生events,来handleEvent
    • EventLoop 通过调用 Poller 和 Channel 并协作。
    • Poller: 监听并获取发生事件 上传给EventLoop(activeChannelList)
    • Channel: 封装fd以及相应event以及相应callback。EventLoop根据Poller上传的channel 来调用handleEvent 处理事件。
  • EventLoop 最重要的函数!!: loop : 核心逻辑如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    // loop -> epoll_wait -> handleEvent(IO线程监听到的event) -> doPendingFunctor()(其他worker线程加入的event cb)
    void EventLoop::loop()
    {
    // 保证是eventloop所属的thread使用该eventloop的loop方法
    assertInLoopThread();
    while(!quit_)
    {
    activeChannels_.clear();
    // 监听两类fd
    // 一种是 listenfd / connfd
    // 一种是 **wakeupfd**
    pollReturnTime_ = poller_->poll(kPollerTimeMs,&activeChannels_);

    // 对于mainReactor的IO loop , 负责监听listenfd , handler : 接收新来的连接 并将新来的连接派发给subReactor(其他thread的eventloop)
    // 对于subReactor的IO loop , 负责监听connfd , handler : 处理 已连接socket上发生的事件.可以在subreactor的ioloop中处理 也可将任务打包提交给线程池处理
    for(Channel *channel:activeChannels_)
    {
    channel->handleEvent(pollReturnTime_);
    }

    // 执行添加给本eventLoop的回调操作(可能是本thread或其他thread)
    doPendingFunctors();
    }
    }

One loop per thread

  • one loop per thread : 为了充分利用多核cpu.每一个核的cpu上跑一个Loop的Thread。

事件通知机制

eventfd

  • eventfd : 可用于唤醒等待中的epoll_wait,select等. 以此来作为一种线程间的事件通知机制

    • thread A处于epoll_wait. 且eventfd注册在上面.
    • thread B写eventfd, 唤醒thread A的epoll_wait. 让thread A向下执行,处理事务.
  • 函数如下

    • int eventfd(unsigned int initval, int flags);
    • eventfd()创建了一个 eventfd object , 其中拥有着一个64bit的整形计数器. 该计数器中的值为user传入的initval
    • read : 成功读取返回一个8byte的整数。read(2)如果提供的缓冲区的大小小于8个字节返回错误EINVAL
    • write将缓冲区写入的8字节整形值加到内核计数器上。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      #include <iostream>
      #include <cassert>
      #include <sys/eventfd.h>
      #include <unistd.h>
      #include <thread>
      #include <sys/epoll.h>

      using namespace std;

      void threadFunc(int eventFd)
      {
      sleep(5);
      uint64_t buf = 999;
      ssize_t n = write(eventFd,&buf,sizeof buf);

      if(n!=sizeof buf)
      {
      cout<<"write "<<n<<" bytes instead of 8"<<endl;
      exit(-1);
      }
      }

      int main()
      {
      int eventFd = eventfd(0,EFD_NONBLOCK | EFD_CLOEXEC);
      cout<<"eventFd : "<<eventFd<<endl;

      int epfd = epoll_create(1);
      struct epoll_event events[5];

      {
      struct epoll_event event;
      event.data.fd = eventFd;
      event.events = EPOLLIN;
      epoll_ctl(epfd,EPOLL_CTL_ADD,eventFd,&event);
      }

      std::thread t(threadFunc,eventFd);

      while(true)
      {
      int numEvents = epoll_wait(epfd,events,5,1000);
      if(numEvents == 0)
      {
      cout<<"timeout"<<endl;
      continue;
      }
      assert(numEvents == 1);
      assert(events[0].data.fd == eventFd);
      uint64_t buf = 0;
      unsigned long n = read(eventFd,&buf,sizeof buf);
      if(n!=sizeof buf)
      {
      cout<<"read "<<n<<" bytes instead of 8"<<endl;
      exit(-1);
      }
      else
      {
      cout<<"read "<<n<<" bytes "<<buf<<endl;
      }
      break;
      }

      t.join();
      close(eventFd);

      cout<<"finished"<<endl;

      return 0;
      }

      shc@DESKTOP-TVUERHD:~/Code/try$ ./eventFd
      eventFd : 3
      timeout
      timeout
      timeout
      timeout
      read 8 bytes 999
      finished

wakeupFd

eventfd封装

  • int wakeupFd_; 主要作用:当mainLoop获取一个新用户的channel,通过轮询算法选择一个subLoop 通过wakeupFd唤醒subLoop处理channel

  • wakeup : EventLoop提供给外界唤醒其epoll_wait的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    //  用来唤醒loop所在的(所属于的)线程(IO线程)。向wakeupFd_写一个数据。
    // wakeupChannel就发生读事件 当前loop线程就会被唤醒。
    void EventLoop::wakeup()
    {
    LOG_DEBUG("wakeup");
    uint64_t one = 1;
    ssize_t n = write(wakeupFd_,&one,sizeof one);
    if(n != sizeof one)
    {
    LOG_ERROR("EventLoop:wake() writes %lu bytes instead of 8\n",n);
    }
    }
  • Eventloop处理eventFd的读事件. 防止LT下busyloop.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    void EventLoop::handleWakeUpFd(const Timestamp& )
    {
    LOG_DEBUG("handle wakeupFd\n");
    uint64_t one = 1;
    ssize_t n = read(wakeupFd_,&one,sizeof one);
    if(n!=sizeof one)
    {
    LOG_ERROR("EventLoop::handleRead() reads %ld bytes instead of 8\n",n);
    }
    }

分发连接

  • 问题:mainReactor如何分发连接 ? 当mainReactor上的Acceptor建立新的连接时,如何将该连接(TCPConnection(connfd))的读写事件注册给subReactor的eventloop中的poller? 让subReactor负责该connfd的所有读写的监听以及回调.

    • 这也就是所谓的muduo的事件通知机制.
    • 通过eventfd(wakeupFd_).统一事件源
  • 事件分发机制概述:每个eventloop都有一个eventfd,其注册在自己loop的epoll上. 外界在想唤醒eventloop,让他处理事件的时候,就向eventloop放入处理事件的callback,然后通过wakeup()向eventFd写8bytes,将eventloope从poll_wait唤醒,处理放入的callback.

  • 那么回到问题.mainReactor如何分发连接 ?

    • 连接建立后,mainReactor上的acceptor调用TcpServer::newConnection,为connfd指定eventloop(subloop),(TCPConnection)设置读写回调. 然后调用subloop的runInLoop->queueInLoop->wakeup(), subloop从epoll_wait中离开,执行connectionEstablished.
      • queueInLoop会将TcpConnection::connectionEstablished交给该eventloop. 该callback就是将connfd注册到该loop的poller
    • 到此,分发完成.
    • 之后该eventloop负责该connfd的所有读写事件.

code

  • mainReactor(main thread)分发连接代码
    • mainReactor的Acceptor
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      // 有一个新的客户端的连接,eventloop 会调用acceptor的handleConnection handleConnection会执行这个回调操作
      void TcpServer::newConnection(int connfd, const InetAddress &peerAddr, const InetAddress &localAddr)
      {
      LOG_DEBUG("TcpServer::newConnection [%s] - new connection [%s] from %s \n",
      name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());
      // mainReactor的Acceptor为connfd指定eventloop
      EventLoop *ioLoop = threadPool_->getNextLoop();
      TcpConnection conn(ioLoop,connfd)
      conn->setUserCallback(...);
      // 调用eventloop的runInLoop -> queueInLoop 分发连接
      ioLoop->runInLoop(std::bind(&TcpConnection::connectionEstablished, conn));
      }
    • mainReactor调用subReactor的eventloop的runInLoop
      • 将connectionEstablished放入eventloop(pendingFunctor)
      • wakeup唤醒eventloop
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        //  把cb放入队列 (wakeupfd唤醒loop所在线程 执行cb)
        void EventLoop::queueInLoop(const Functor& cb)
        {
        pendingFunctors_.emplace_back(cb);
        if(!isInLoopThread())
        {
        // main thread 通过loop对象wakeup方法 唤醒卡在epoll wait的eventloop
        wakeup();
        }
        }
    • subReactor的Eventloop被唤醒。在doPendingFunctor中通过TcpConnection::connectionEstablished真正实现分发连接. 将该connfd的读写事件注册到本channel所属TCPConnection的loop的poller上(每个loop都有一个poller channel只要知道自己是哪个loop就能找到对应的poller.)
      1
      2
      3
      4
      5
      void TcpConnection::connectionEstablished()
      {
      // 注册! connfd 到 loop's poller!
      channel_->enableReading();
      }

小结

  • 一言以bi之:

    • mainReactor在建立新连接后,通过写eventFd唤醒subReactor. subReactor将connfd注册在其epoll上,之后负责监听处理connfd所有的读写事件.
  • 所谓统一事件源:

  • subReactor要处理的事件只有监听connfd的读写,并不负责建立新连接. 试想(我猜的)如果没有eventfd. 那么 mainReactor接收新连接后要实现分:就只能通过信号通知subReactor,亦或者是放入一个任务队列 ,subReactor定期从中拿取新连接. 都过于复杂或者性能低.

  • 而有了eventFd,mainReactor可以向其中写来唤醒subReactor. 从subReactor的角度来看也是将事件 : 监听新connfd 转化成了一个 eventfd的读事件. 也是通过epoll监听. 和其他正常connfd的读写一样.