- 整理一下学习收获 先整理下muduo Buffer 因为属于是实现比较简单的组件。还比较巧妙。且便于移植。
- muduo Buffer
- 参考 源码 以及 《Linux多线程服务器编程》
non-blocking 网络编程中 应用层Buffer的必要性
non-blocking的核心思想:
- 为了可以最大限度的复用 thread-of-control,让一个thread服务于多个socket连接。
- 即,控制权应当尽量交给event loop(因为只有event loop才能接受新连接或者检测到socket的事件)
- 即,不让reactor的loop在没必要阻塞的地方阻塞。
- IO thread 只能阻塞在 IO multiplexing函数
- 因此要避免阻塞在read/write/或者其他IO系统调用上
- 为了可以最大限度的复用 thread-of-control,让一个thread服务于多个socket连接。
TcpConnection中有一个inputBuffer和一个outputBuffer
- 这里的intput output是针对user代码而言
outputBuffer的必要性
概述:muduo中,user不会自己直接write socket,也不会直接操作outputBuffer,而是间接的。通过TcpConnection::send(),来决定是write socket 还是操作outputBuffer。
- 不是所有数据都会先写入outputBuffer 再发送给socket的。也会有不用写入outputBuffer直接write给socket的。但不是用户调用write。用户调用TcpConnection::send(),之后就是send的逻辑了。决定write还是outputBuffer。用户不必再管。
必要性
- 场景:
- 程序通过TCP连接发送100KB数据,但是在write调用中,os只接受了80KB。
- 这也就是常说的:应用写的快,而内核发送数据慢,因此需要把待发送的数据写入缓冲区
- 如果没有缓冲区的话,此时我们可以选择原地等待。等到socket的缓冲区可写。那么,此时程序的控制权就不在event loop了。
- 如果没有缓冲区的话,也可以不原地等待,而是直接将控制返回给event loop。那么,此时,剩下的20KB怎么办?
- 那么,就需要有一个outputBuffer,来接管这20KB。
- 程序通过TCP连接发送100KB数据,但是在write调用中,os只接受了80KB。
- 关于责任:使用网络库的人不应当去关心如何发送数据,将数据发送了几次,应当只负责将数据交给网络库去发送(muduo中是提供TcpConnection::send()给user)。
- user —data—> tcpConnection –send–> socket
- 因此,网络库中需要有一个outputBuffer,来接管上文中user生成的,网络库未发送完的20KB数据。
- 场景:
- Buffer设计逻辑:
- 感觉很合理啊。
- 对于每个TcpConnection都有一个outputBuffer
- user想要发送XXKB。(通过调用TcpConnection::Send())
- 如果outputBuffer中没有数据的话,那么直接发送。
- 如果一次发送完,那么并不需要将数据加入outputBuffer,也不需要在Poller上注册EPOLLOUT事件。
- 如果发送完还剩20KB的话,TcpConnection将未发送完的数据20KB保存在outputBuffer中,并再Poller上给fd注册POLLOUT事件。
- 之后,如果socket缓冲区变得可写,那么,event loop就会监测到该事件,并调用TcpConnection::handleWrite() 将outputBuffer中的剩余数据发送出去。
- 当outputBuffer中的数据写完时,Poller就停止监听POLLOUT。避免busy loop。
- 如果outputBuffer中有数据的话,那么不直接发送,而是先将这xxKBappend到outputBuffer的20KB的data之后,等到socket可写时,一并写入socket。而不是直接发送。
- 当该TcpConnection的Buffer中还有数据未写,也即Poller上仍监听该fd的POLLOUT事件时,如果程序想要关闭连接。那么,不会立刻关闭,而是先等到outputBuffer中的数据全都写完,再关闭链接。
- 见TcpConnection::shutdown()
1
2
3
4
5
6
7
8
9
10
11
12
13
14// 保证在IO thread中调用
// 当outputBuffer里面没有待发送数据时 关闭本connection的socket写端
// 当outputBuffer里面有待发送数据时 等handleWrite将outputBuffer全部写出之后 handleWrite会调用shutdownInLoop
void TcpConnection::shutdownInLoop()
{
// outputBuffer中的bytes已经全部发送完成
if(!channel_->isWriting())
{
socket_->shutdownWrite(); // 关闭写端
}
// 如果outputBuffer没全部发送
// 之后也会调用到shutdownInLoop
// 见handleWrite
}
- 见TcpConnection::shutdown()
- 如果outputBuffer中没有数据的话,那么直接发送。
inputBuffer的必要性
概述:socket从网卡中读来的数据 都会由TcpConnection先读入inputBuffer。然后,将inputBuffer交由user来处理。user拿到inputBuffer时,里面就已经有了这次读到的所有数据。
必要性
场景:接收方需要接受2KB数据
- 一次性收到2KB
- 分两次收到,一次600B,一次1400B
- 分两次收到,一次1KB
- ….
- 所以
关于责任
- 网络库在处理socket可读事件时,必须一次性把socket 接收缓冲区里的数据读完,user从网络库的Buffer里面取数据即可。(从kenrel的Buffer搬到应用层的Buffer),否则会反复触发POLLIN事件,造成busy-loop。
- 如果网络库不提供缓冲区,仅仅通知user某个socket可读/可写. 或者只提供一个固定大小的缓冲区如char 8192
- 那么user需要自己负责读取所有数据到user的应用层buffer。不方便。(如果user自己的buffer不够大,那么就需要读很多次)
网络库必然会遇到 “数据不完整” 的情况,如上,分多次收到一批数据。应对:收到的数据先放到inputBuffer中,等构成一条完成的消息再通知程序的业务逻辑。[这个实现还没看。应当是7.3节]
- 网络库在处理socket可读事件时,必须一次性把socket 接收缓冲区里的数据读完,user从网络库的Buffer里面取数据即可。(从kenrel的Buffer搬到应用层的Buffer),否则会反复触发POLLIN事件,造成busy-loop。
Buffer设计思路
- user不会自己操作read socket,而是操作TcpConnection的inputBuffer。
- 与user间接outputBuffer不同,user可以直接操作inputBuffer。
- user在onMessage()中,收到的data已经被存储在inputBuffer中,user可以直接操作inputBuffer。
void onMessage(const TcpConnectionPtr& conn,Buffer *buffer,Timestamp time)
inputBuffer相关代码
- connfd刚被accept,就会将其读事件注册在Poller上。channel的callback就是TcpConnection::handleRead().
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32void TcpConnection::connectionEstablished()
{
// 给新建立的connfd 注册读事件到Poller
channel_->enableReading();
}
// 设置回调
channel_->setReadCallback(std::bind(&TcpConnection::handleRead,this,std::placeholders::_1));
// 注册给channel 这个函数最终会被ioloop调用
void TcpConnection::handleRead(Timestamp receiveTime)
{
int savedErrno = 0;
// 将fd中可读的数据 读到inputBuffer中
ssize_t n = inputBuffer_.readFd(channel_->fd(),&savedErrno);
LOG_INFO("handle read %lu bytes",n);
if(n>0)
{
messageCallback_(shared_from_this(),&inputBuffer_,receiveTime);
}
else if(n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_ERROR("TcpConnection::handleRead");
handleError();
}
}
outputBuffer相关代码
- user通过调用TcpConnection::send()来发送data。
- user -> send -> sendInLoop。sendInLoop中的逻辑见上文中outputBuffer的设计逻辑。
- poller监测到write事件时,loop会调用TcpConnection::handleWrite 将outputBuffer中的数据发送。
- TcpConnection.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180// send -> sendInLoop
void TcpConnection::send(const std::string &buf)
{
// LOG_INFO("send bytes buf %s",buf.c_str());
int n = buf.size();
const char *str = buf.c_str();
LOG_INFO("test send %d : %s",n,str);
if(state_ == kConnected)
{
// sendInLoop
// 会根据 output queue中是否还有未发送数据
// 来决定是直接发送buf 还是先将buf送入outputBuffer 再发送
if(loop_->isInLoopThread())
{
sendInLoop(buf.c_str(),buf.size());
}
else
{
loop_->runInLoop(std::bind(
&TcpConnection::sendInLoop,
this,
buf.c_str(),
buf.size()
));
}
}
}
/*
发送数据:
应用写的快 而内核发送数据慢
需要把待发送的数据写入缓冲区
*/
void TcpConnection::sendInLoop(const void *data,size_t len)
{
ssize_t nwrote = 0;
size_t remaning = len;
bool faultError = false;
// 之前调用过该connection的shutdown 不能再进行发送了
if(state_ == kDisconnected)
{
LOG_ERROR("disconnected give up writing!");
return ;
}
// 如果output queue(outputBuffer_) 中没有任何数据的话,那么就直接发送给fd。
// 表示channel第一次开始写数据 而且缓冲区没有待发送数据
if(!channel_->isWriting() && outputBuffer_.readableBytes()==0)
{
LOG_INFO("直接将data发送,而不是先送入outputBuffer");
// output queue中没有待发送数据 因此 直接将data写给fd
nwrote = ::write(channel_->fd(),data,len);
LOG_INFO("nwrote = %d",nwrote);
if(nwrote >= 0)
{
// 是否将data都发送完
remaning = len - nwrote;
if(remaning == 0 && writeCompleteCallback_)
{
// 如果发送完了 那么就调用一下writeCompleteCallback_
// 既然在这里 数据全部发送完成 就不用再给channel 设置 epollout事件了
loop_->queueInLoop(
std::bind(writeCompleteCallback_,shared_from_this())
);
}
// else ?
// 没发送完的话不是应当write more data 吗 ?
// 为什么没处理 ?
// 在这个if外面处理了
}
else // nwrote < 0
{
nwrote = 0;
if(errno != EWOULDBLOCK)
{
LOG_ERROR("TcpConnection::sendInLoop error");
// 对方socket重置
// server收到
// SIGPIPE
// RESET
if(errno == EPIPE || errno == ECONNRESET)
{
LOG_INFO("errno == %s",strerror(errno));
faultError = true;
}
}
}
}
// 说明当前这一次write并没有把数据全部发送出去
// 剩余的数据需要保存到缓冲区当中
// 然后给channel注册epollout事件
// poller发现tcp的发送缓冲区有空间
// 会通知相应的sock->channel
// 调用writeCallback_ 回调方法
// 也就是TcpConnection::handleWrite方法 把发送缓冲区的数据全部发送完成为止
if(!faultError && remaning > 0)
{
LOG_INFO("data没能全部发送走 因此将剩余的data放入outputBuffer");
// 目前发送缓冲区剩余的待发送数据的长度
size_t oldlen = outputBuffer_.readableBytes();
if(oldlen + remaning >= highWaterMark_
&& oldlen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(
std::bind(highWaterMarkCallback_,shared_from_this(),oldlen + remaning)
);
}
// 将data中没有发送的数据 存入outputBuffer
outputBuffer_.append((char*)data+nwrote,remaning);
// 这里一定要注册Channel的写事件
// 否则poller不会给channel通知epollout
if(!channel_->isWriting())
{
LOG_INFO("将fd = %d的写事件 注册给poller",channel_->fd());
channel_->enableWriting();
}
}
}
channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite,this));
// outputBuffer中有未发送数据 且 tcp socket的写缓冲区可写的时候 loop会执行这个call back
void TcpConnection::handleWrite()
{
LOG_INFO("fd = %d , outputBuffer data size = %ld ",channel_->fd(),outputBuffer_.readableBytes());
// channel_ 封装的fd 有注册在epoll上的writing事件
if(channel_->isWriting())
{
int savedErrno = 0;
// outputBuffer中的bytes 写入 可写的fd
ssize_t n = outputBuffer_.writeFd(channel_->fd(),&savedErrno);
if(n > 0)
{
// 将已经写出的bytes取出 :readerIdx向前移动nbytes
outputBuffer_.retrieve(n);
// 如果已经没有数据可以从buffer中取出 可取bytes : [readerIdx,writerIdx)
// 即buffer中的数据都已经写给fd
// 那么我们需要立刻停止epoll对writable事件的监听
// !=0的话 代表buffer中还有可写数据 那么不必取消监听。epoll稍后应当检测到可写事件 然后继续调用本函数进行写
if(outputBuffer_.readableBytes() == 0)
{
// 取消监听!!!
channel_->disableWriting();
// 执行写完成回调
if(writeCompleteCallback_)
{
// 唤醒loop对应的thread线程 执行回调
// 这个writeCompleteCallback 是什么?
loop_->queueInLoop(
std::bind(writeCompleteCallback_,shared_from_this())
);
}
// 如果全部outputBuffer里面的数据全部写出 并且server已经要求关闭这个连接,但是之前由于outputBuffer里面有要写的数据 因此没有关闭连接 的话
// 那么 此时 数据全部写出 就要调用刚才没执行的shutdownInLoop()
if(state_ == kDisconnecting)
{
// 关闭socket的write方向
shutdownInLoop();
}
}
else
{
LOG_INFO("I'm going to write more data");
}
}
else
{
LOG_ERROR("TcpConnection : outputBuffer write error\n");
}
}
else
{
LOG_ERROR("TcpConnection fd = %d is down , no more is writing\n",channel_->fd());
}
}
Buffer实现
结构
- 结构如下
- readable和writable都是针对Buffer来说的。
- readable : 可以从Buffer中读出的数据
- writable : 还可以向Buffer中写入
- prependable = readInx
- readable = writeIndex - readIndex
- writable = buffer_.size() - writeIndex
接口
- Buffer中的readable writeable都是针对Buffer来说的。
- 可从buffer中读出(读出来给fd) ;可向buffer中写入(从fd中读出来写给buffer)
- [readerIdx,writerIdx) 有效数据 可从buffer中读出
- [writerIdx,size) 剩余的可向buffer中写入的byte数
- Interface for user
- readFd fd : 用户使用readFd 从fd读取数据 输入给Bufferd的writable区域[writeIdx,size)
- writeFd fd : 用户使用writeFd 将Buffer中的readable数据 全部输入给fd文件
- retrieveAllAsString : 用户从buffer中拿走全部readable bytes。以string形式获得。会改变readIdx
- append : Buffer使用者 通过append 向Buffer中增添数据。buffer数据增加最终只有一种方式:append (可能是通过readFd调用,也可能是直接调用)
- Important memeber Function
- makeSpace : 无需user自己调用。扩容/腾挪。确保有nbytes的writable
- retrieve : 无需user自己调用。清除Buffer中已经被读走的bytes。(通过移动readerIdx)
- user不必关心Buffer的大小。Buffer是自适应扩张的。
- Buffer会自己扩充内存 管理writeIdx readIdx
- 因此 user只需要负责 读数据到buffer,从buffer中拿出数据。