不落辰

知不可乎骤得,托遗响于悲风

0%

muduo-Multi-Reactors

muduo类图

TcpServer - multi/single-Reactor

  • 全图

  • 大图

  • 建立新连接+分发新连接

  • 已连接读写

  • 销毁连接

Acceptor

  • Acceptor:封装了listening fd 以及 连接event 以及 处理连接事件的callback : handleConnection

  • 重要成员

    • acceptSocket_:listening fd
    • acceptChannel_:listening fd 以及 监听事件 以及 回调
    • EventLoop *loop:listening fd 所属的eventLoop。一般是main thread 的 eventLoop.
    • newConnectionCallback_: TcpServer构造函数中将 TcpServer::newConnection 绑定给acceptor。故acceptor接收连接之后 分发client的channel给ioloop。
  • 重要函数

  • handleConnection

    • 当poller epoll wait 监听到连接事件时 会调用该回调
        1. handleConnection : accept连接 得到新connfd
        1. newConnectionCallback(TcpServer绑定的) : 本mainReactor选择一个subReactor 将连接派发。
    • 核心逻辑如下
      1
      2
      3
      4
      5
      6
      7
      8
      void Acceptor :: handleConnection(Timestamp)
      {
      InetAddress peerAddr;
      int connfd = acceptSocket_.accept(&peerAddr); // accpet
      InetAddress localAddr(InetAddress::getSockName(connfd)); // 获取本机信息
      newConnectionCallback_(connfd,peerAddr,localAddr); // 轮询subloop 唤醒 分发当前新client的channel
      ...
      }
  • listen

    • socket开启listen队列,poller监听连接事件,等待connect。

TcpConnection

  • 与acceptor平级

    • acceptor : 负责listening fd 的accept。
      • mainloop中的 封装listenfd、相关事件及回调.(监听、接受连接、分发连接给subLoop)
    • tcpconntcion : 负责一个connfd的通信
      • subloop中的 封装已连接connfd、相关事件及回调.(读消息事件、发送消息事件、连接关闭事件、错误事件等)
  • 功能!!!

    • 1. encapsulate fd into channel
    • 2. register callback into channel and keep callback
    • 3. bind ownerLoop and channel(fd) : channel enabling -> reigster on the poller. 即所谓的派发连接给subloop,就是将指定subloop的poller监听、处理channel(fd)的event。
    • 4. keep channel(fd)
  • 重要成员
    • socket_ : connfd
    • connectionEstablished: 连接建立完成后调用
      • 将connfd的读事件注册在poller上
        • channel_->enableReading()
      • 调用user set connectionCallback
      • connectionEstablished在TcpServer::newConnection中被调用
    • connectionDestroyed : 销毁连接
      • 会调用到user set connectionCallback
      • 销毁连接时会通过调用TcpServer::removeConnection来销毁连接
        • removeConnection -> removeConnectionInLoop -> connectionDestroyed
    • channel_ : connfd的事件(感兴趣和实际发生)以及回调(读、写、关闭连接等)。
      • 这个Channel的callback是在TcpConnection对象构造函数中注册的。
        • 这些callback最终都会在loop的handleEvent中被调用
        • TcpConnection::handleRead / handleClose / handleError / handleWrite
          • handleRead中会调用user set的messageCallback_
          • 同理 handleWrite 有 user set 的 writeCompleteCallback_
          • handleCose 有 closeCallback_
    • **loop_**:本TcpConnection所属的subloop
    • inputBuffer_:user态的接收缓冲区。
    • outputBuffer_:user态的发送缓冲区。
    • state_:当前TCP连接的状态(Connected、Connecting、Disconnecting、Disconnected)
    • connetionCallback_、messageCallback_、writeCompleteCallback_
      • 交给用户自定义。(这也是用户唯数不多的自由)
    • closeCallback_ :
      • muduo TcpServer自己定义的(不交给用户)
    • 用户如何自定义?
      • TcpServer给了用户这样的接口。如setConnectionCallback。之后TcpServer再传给TcpConnection
  • 重要函数
    • handleRead()
      • 处理读事件,将内容存入inputBuffer
      • 然后再调用user set的messageCallback_
      • 核心代码
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        //  注册给channel 这个函数最终会被ioloop调用
        void TcpConnection::handleRead(Timestamp receiveTime)
        {
        // 将fd中可读的数据 读到inputBuffer中
        ssize_t n = inputBuffer_.readFd(channel_->fd(),&savedErrno);
        if(n>0)
        messageCallback_(shared_from_this(),&inputBuffer_,receiveTime);
        else if(n == 0)
        handleClose();
        else
        handleError();
        }
    • handleWrite()
      • 负责处理epoll上注册的Tcp连接的可写事件(即写数据给fd)。视情况与outputBuffer交互。
      • 写完之后 调用 writeCompleteCallback_
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        void TcpConnection::handleWrite()
        {
        // outputBuffer中的bytes 写入 fd
        ssize_t n = outputBuffer_.writeFd(channel_->fd(),&savedErrno);

        // 写完 停止Poller监听
        if(outputBuffer_.readableBytes() == 0)
        {
        channel_->disableWriting();
        if(writeCompleteCallback_)
        {
        loop_->queueInLoop(
        std::bind(writeCompleteCallback_,shared_from_this())
        );
        }
        }
        // 没写完 继续监听 稍后处理
        else
        {
        LOG_INFO("I'm going to write more data");
        }
        }
        // 如何做到一个事件的处理不长时间占用控制权?措施之一如下
        // 如果一次没写完的话,不会阻塞在这里等待缓冲区空闲然后继续写
        // 而是将控制交还给eventloop。因为写事件还在epoll tree上监听,并且未写的数据存在outputBuffer中
        // 等待下一次循环poller监听到写事件 触发write即可
  • handleClose()
    • 负责处理Tcp连接关闭的事件。
    • 大概的处理逻辑就是
      • 将本TCP Connection的fd 的 channel_从poller中移除。
      • connectionCallback_ (user set)
      • closeCallback_。(muduo set)
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        //  poller ->  channel::closeCallback -> TcpConnection::handleClose -> TcpServer::removeConnection -> TcpConnection::connectionDestoyed
        void TcpConnection::handleClose()
        {
        // 改变状态
        setState(kDisconnected);
        // 从poller上拿下来
        channel_->disableAll();
        TcpConnectionPtr connPtr(shared_from_this());
        // user设置的
        connectionCallback_(connPtr); // 连接断开和连接的回调
        // muduo set TcpServer::removeConnection -> TcpConnection::connectionDestroyed
        closeCallback_(connPtr); // 关闭连接的回调 TcpServer::removeConnection
        }
  • send
    • send -> sendInLoop ->注册handleWrite
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      void TcpConnection::send(const std::string &buf)
      {
      sendInLoop(buf.c_str(),buf.size());
      or loop_->runInLoop(std::bind(
      &TcpConnection::sendInLoop,
      this,
      buf.c_str(),
      buf.size()
      ));
      }
  • sendInLoop : 与outputBuffer和Poller交互,执行发送

TcpServer

Acceptor ,EventLoopThreadPool ,TcpConnection

重要成员:

  • unique_ptr<Acceptor.> acceptor_
    • TcpServer::newConnection 绑定给acceptor
    • acceptor接收新连接之后 将该连接分发给threadPool中的subloop(subReactor)
  • shared_ptr<EventLoopThreadPool.> threadPool_
    • subReactors pool

重要函数

  • newConnection :accept新连接之后, 会调用acceptor的handleConnection,handleConnection会执行newConnection。

    • 将新连接派发给subloop,即将新建立连接的connfd的监听和处理都交给一个ioloop
      • 如何做到派发?
      • 通过new TcpConnection。TcpConnection会负责将subloop和connfd的channel绑定在一起。即将channel注册在subloop的poller上(在TcpConnection::connectionEstablished中执行)。
    • 如何唤醒 派发到的(subloop)subReactor,使其处理新连接的connectionEstablished ?
      • wakeupFd
      • subloop此时应当阻塞在poller wait上。main thread 通过向其监听的wakeupFd写入data,使其从poller wait中离开。
  • removeConnection:注册在TcpConnection::handleClose中。

    • 关闭连接。流程见下文
  • start

    • 启动EventLoopThreadPool中的subloop
    • mainloop 开启listening queue 并注册监听listenfd到mainloop的Poller上。
  • setsubThreadNum

    • 设置subReactor数量。
      • num = 0. 则只有一个reactor,既负责监听、处理listening fd的新连接建立事件,又负责监听、处理已连接socket connfd的读写事件。
      • num > 0. 则一个mainReactor num个subReacor。
        • mainReactor 的mainloop负责监听处理listeningfd的新连接建立事件。
        • subReactor 的subloop负责监听处理connfd的读写事件
    • 可见 blog服务设计范式
  • 以及callback之间关系
  • 还有,muduo中关于callback的使用。即function 和 bind的组合使用。
  • 有一件大任务,需要分给几个组件来做,或者需要分成不同时期来做
    • 上层做不完的事情/不应当在此时调用的事情 ,上层将他自己要做的那一部分事情存入函数。将函数注册给下层。下层执行下层可以做的事情,并适时执行上层注册函数,来完成整个任务。