muduo类图
TcpServer - multi/single-Reactor
全图
大图
建立新连接+分发新连接
已连接读写
销毁连接
Acceptor
Acceptor:封装了listening fd 以及 连接event 以及 处理连接事件的callback : handleConnection
重要成员
- acceptSocket_:listening fd
- acceptChannel_:listening fd 以及 监听事件 以及 回调
- EventLoop *loop:listening fd 所属的eventLoop。一般是main thread 的 eventLoop.
- newConnectionCallback_: TcpServer构造函数中将 TcpServer::newConnection 绑定给acceptor。故acceptor接收连接之后 分发client的channel给ioloop。
重要函数
handleConnection
- 当poller epoll wait 监听到连接事件时 会调用该回调
- handleConnection : accept连接 得到新connfd
- newConnectionCallback(TcpServer绑定的) : 本mainReactor选择一个subReactor 将连接派发。
- 核心逻辑如下
1
2
3
4
5
6
7
8void Acceptor :: handleConnection(Timestamp)
{
InetAddress peerAddr;
int connfd = acceptSocket_.accept(&peerAddr); // accpet
InetAddress localAddr(InetAddress::getSockName(connfd)); // 获取本机信息
newConnectionCallback_(connfd,peerAddr,localAddr); // 轮询subloop 唤醒 分发当前新client的channel
...
}
- 当poller epoll wait 监听到连接事件时 会调用该回调
listen
- socket开启listen队列,poller监听连接事件,等待connect。
TcpConnection
与acceptor平级
- acceptor : 负责listening fd 的accept。
- mainloop中的 封装listenfd、相关事件及回调.(监听、接受连接、分发连接给subLoop)
- tcpconntcion : 负责一个connfd的通信
- subloop中的 封装已连接connfd、相关事件及回调.(读消息事件、发送消息事件、连接关闭事件、错误事件等)
- acceptor : 负责listening fd 的accept。
功能!!!
- 1. encapsulate fd into channel
- 2. register callback into channel and keep callback
- 3. bind ownerLoop and channel(fd) : channel enabling -> reigster on the poller. 即所谓的派发连接给subloop,就是将指定subloop的poller监听、处理channel(fd)的event。
- 4. keep channel(fd)
- 重要成员
- socket_ : connfd
- connectionEstablished: 连接建立完成后调用
- 将connfd的读事件注册在poller上
- channel_->enableReading()
- 调用user set connectionCallback
- connectionEstablished在TcpServer::newConnection中被调用
- 将connfd的读事件注册在poller上
- connectionDestroyed : 销毁连接
- 会调用到user set connectionCallback
- 销毁连接时会通过调用TcpServer::removeConnection来销毁连接
- removeConnection -> removeConnectionInLoop -> connectionDestroyed
- channel_ : connfd的事件(感兴趣和实际发生)以及回调(读、写、关闭连接等)。
- 这个Channel的callback是在TcpConnection对象构造函数中注册的。
- 这些callback最终都会在loop的handleEvent中被调用
- TcpConnection::handleRead / handleClose / handleError / handleWrite
- handleRead中会调用user set的messageCallback_
- 同理 handleWrite 有 user set 的 writeCompleteCallback_
- handleCose 有 closeCallback_
- 这个Channel的callback是在TcpConnection对象构造函数中注册的。
- **loop_**:本TcpConnection所属的subloop
- inputBuffer_:user态的接收缓冲区。
- outputBuffer_:user态的发送缓冲区。
- state_:当前TCP连接的状态(Connected、Connecting、Disconnecting、Disconnected)
- connetionCallback_、messageCallback_、writeCompleteCallback_
- 交给用户自定义。(这也是用户唯数不多的自由)
- closeCallback_ :
- muduo TcpServer自己定义的(不交给用户)
- 用户如何自定义?
- TcpServer给了用户这样的接口。如setConnectionCallback。之后TcpServer再传给TcpConnection
- 重要函数
- handleRead()
- 处理读事件,将内容存入inputBuffer
- 然后再调用user set的messageCallback_
- 核心代码
1
2
3
4
5
6
7
8
9
10
11
12// 注册给channel 这个函数最终会被ioloop调用
void TcpConnection::handleRead(Timestamp receiveTime)
{
// 将fd中可读的数据 读到inputBuffer中
ssize_t n = inputBuffer_.readFd(channel_->fd(),&savedErrno);
if(n>0)
messageCallback_(shared_from_this(),&inputBuffer_,receiveTime);
else if(n == 0)
handleClose();
else
handleError();
}
- handleWrite()
- 负责处理epoll上注册的Tcp连接的可写事件(即写数据给fd)。视情况与outputBuffer交互。
- 写完之后 调用 writeCompleteCallback_
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26void TcpConnection::handleWrite()
{
// outputBuffer中的bytes 写入 fd
ssize_t n = outputBuffer_.writeFd(channel_->fd(),&savedErrno);
// 写完 停止Poller监听
if(outputBuffer_.readableBytes() == 0)
{
channel_->disableWriting();
if(writeCompleteCallback_)
{
loop_->queueInLoop(
std::bind(writeCompleteCallback_,shared_from_this())
);
}
}
// 没写完 继续监听 稍后处理
else
{
LOG_INFO("I'm going to write more data");
}
}
// 如何做到一个事件的处理不长时间占用控制权?措施之一如下
// 如果一次没写完的话,不会阻塞在这里等待缓冲区空闲然后继续写
// 而是将控制交还给eventloop。因为写事件还在epoll tree上监听,并且未写的数据存在outputBuffer中
// 等待下一次循环poller监听到写事件 触发write即可
- handleRead()
- handleClose()
- 负责处理Tcp连接关闭的事件。
- 大概的处理逻辑就是
- 将本TCP Connection的fd 的 channel_从poller中移除。
- connectionCallback_ (user set)
- closeCallback_。(muduo set)
1
2
3
4
5
6
7
8
9
10
11
12
13// poller -> channel::closeCallback -> TcpConnection::handleClose -> TcpServer::removeConnection -> TcpConnection::connectionDestoyed
void TcpConnection::handleClose()
{
// 改变状态
setState(kDisconnected);
// 从poller上拿下来
channel_->disableAll();
TcpConnectionPtr connPtr(shared_from_this());
// user设置的
connectionCallback_(connPtr); // 连接断开和连接的回调
// muduo set TcpServer::removeConnection -> TcpConnection::connectionDestroyed
closeCallback_(connPtr); // 关闭连接的回调 TcpServer::removeConnection
}
- send
- send -> sendInLoop ->注册handleWrite
1
2
3
4
5
6
7
8
9
10void TcpConnection::send(const std::string &buf)
{
sendInLoop(buf.c_str(),buf.size());
or loop_->runInLoop(std::bind(
&TcpConnection::sendInLoop,
this,
buf.c_str(),
buf.size()
));
}
- send -> sendInLoop ->注册handleWrite
- sendInLoop : 与outputBuffer和Poller交互,执行发送
TcpServer
Acceptor ,EventLoopThreadPool ,TcpConnection
重要成员:
- unique_ptr<Acceptor.> acceptor_
- TcpServer::newConnection 绑定给acceptor
- acceptor接收新连接之后 将该连接分发给threadPool中的subloop(subReactor)
- shared_ptr<EventLoopThreadPool.> threadPool_
- subReactors pool
重要函数
newConnection :accept新连接之后, 会调用acceptor的handleConnection,handleConnection会执行newConnection。
- 将新连接派发给subloop,即将新建立连接的connfd的监听和处理都交给一个ioloop
- 如何做到派发?
- 通过new TcpConnection。TcpConnection会负责将subloop和connfd的channel绑定在一起。即将channel注册在subloop的poller上(在TcpConnection::connectionEstablished中执行)。
- 如何唤醒 派发到的(subloop)subReactor,使其处理新连接的connectionEstablished ?
- wakeupFd
- subloop此时应当阻塞在poller wait上。main thread 通过向其监听的wakeupFd写入data,使其从poller wait中离开。
- 将新连接派发给subloop,即将新建立连接的connfd的监听和处理都交给一个ioloop
removeConnection:注册在TcpConnection::handleClose中。
- 关闭连接。流程见下文
start
- 启动EventLoopThreadPool中的subloop
- mainloop 开启listening queue 并注册监听listenfd到mainloop的Poller上。
setsubThreadNum
- 设置subReactor数量。
- num = 0. 则只有一个reactor,既负责监听、处理listening fd的新连接建立事件,又负责监听、处理已连接socket connfd的读写事件。
- num > 0. 则一个mainReactor num个subReacor。
- mainReactor 的mainloop负责监听处理listeningfd的新连接建立事件。
- subReactor 的subloop负责监听处理connfd的读写事件
- num = 0. 则只有一个reactor,既负责监听、处理listening fd的新连接建立事件,又负责监听、处理已连接socket connfd的读写事件。
- 可见 blog服务设计范式
- 设置subReactor数量。
- 以及callback之间关系
- 还有,muduo中关于callback的使用。即function 和 bind的组合使用。
- 有一件大任务,需要分给几个组件来做,或者需要分成不同时期来做
- 上层做不完的事情/不应当在此时调用的事情 ,上层将他自己要做的那一部分事情存入函数。将函数注册给下层。下层执行下层可以做的事情,并适时执行上层注册函数,来完成整个任务。