
一言以蔽之
lab2实现的tcp receiver
- (0) 转换stream_idx ,abs_seq ,seq(属于内部功能)
- (1) 利用 StreamReassembler对 接收到的segment进行重组 并压入bytestream
- (2) 填充receive_window的相关信息:ackno , window_size, 从而实现flow-control流量控制
- (3) 根据 接收到的报文 对tcp receiver进行状态划分. listen syn_recv fin_recv
我们的receiver只实现了234
背景 & Receiver功能 & 分析

lab0实现了ByteStream,lab1实现了StreamReassembler,lab2要求我们实现TCPReceiver
TCP是一个在不可靠数据报之上 可靠地传输 可控制的字节流。TCP连接有两方参与,每一方都同时扮演着sender和receiver的角色。这两方被称为TCP连接的端点endpoint或者对等方peers
TCP receiver
- (1) 接收对端发来的segment
- (2) 使用我们之前实现的StreamReassembler将segment重组进ByteStream
- (3) 计算出receive_window的相关信息:ackno以及receive_window size. 随output segment被传输给对端
- receive_window:
- 含义:receiver感兴趣的、愿意接受的字节的下标范围,也是允许 tcp sender 发送的字节范围。
- 作用:tcp receiver使用该receive window,可以进行流量控制,使得sender限制自己的发送量,直到receiver可以接收更多data。(Using the window, the receiver can control the flow of incoming data, making the sender limit how much it sends until the receiver is ready for more.)
- ackno : 即tcp header中的acknowledgement number字段。
- 即 lab1中所实现的receive_window [first_unassembled,first_unacceptable)的左边界
- 即 ackno = first_unassembled
- 含义如下
- receiver期望从另一方收到的下一个字节的序号,(接收该字节可以使得receiver重组更多的字节流)
- 累计确认
- ack number:已经收到了[begin,ack-1]的所有bytes,期待收到的下一个byte是ack 。 即期待收到的下一个TCP报文段的body的起始字节的序号是ack
- window_size : 即TCP Header中的window size字段
- 即lab1中所实现的receiving_window的大小
- receive_window size + stream size = capacity
- receive_window size = first_unacceptable - first_unassembled


- receive_window:
为什么要实现这些?
- 这些信号对实现TCP的功能至关重要,为了TCP能够在不可靠的数据报网络之上提供流量控制、可靠的字节流。
- TCP中,ack意味着,接收方所期待的下一个字节,使其能够重组更多的bytestream。它告诉sender应该发送什么字节
- Flow Control流量控制意味着 receiver感兴趣的、愿意接受的字节的下标范围。这告知了sender其被允许发送多少字节。
index 关系
seqno , absolute seqno , stream idx
A 64-bit index is big enough that we can treat it as never overflowing.
- Transmitting at 100 gigabits/sec, it would take almost 50 years to reach 2^64 bytes. By contrast, it takes only a third of a second to reach 2^32 bytes
seqno
- 32bits , wrapping
- tcp 的 stream 可以任意长
- start from isn(Initial Sequence Number)
- TCP sequence numbers start at a random value.
- To improve security and avoid getting confused by old segments belonging to earlier connections between the same endpoints, TCP tries to make sure sequence numbers can’t be guessed and are unlikely to repeat. So the sequence numbers for a stream don’t start at zero.
- Include SYN/FIN
- The logical beginning and ending each occupy one sequence number
- SYN报文 代表 beginning of the stream , 占据ISN seq,
- FIN报文 代表 end of the stream , 占据最后一个seq
- SYN和FIN之间的每个byte都占据一个seq
- 32bits , wrapping
absolute seq :
- 不考虑(isn)initial seq num,即seq - isn。(即start at 0)
- 64bit , non-wrapping。
- 包含SYN/FIN。
stream idx :
- StreamReassembler中,stream中每个字节的idx。
- start at 0
- 64bit , non-wrapping。
- 不包含SYN/FIN
seqno , absolute seqno , stream idx之间的关系 关键图


转换:stream_idx <-> abs_seq <-> seq
易知stream_idx和abs_seq之间的转换
- assert(absolute seq > 0) : stream_idx = absolute seq - 1
abs_seq和seq之间的转换见下。
WrappingInt32 seq <-> abs_seq
WrappingInt32
- 实现absolute sequence number和sequence nunmber的转换
absolute sequence number -> sequence number
- WrappingInt32 wrap(uint64_t abs_seq, WrappingInt32 isn)
- abs_seq : seq = (abs_seq + isn) % 2^32
[63,32][31,0],舍去[63,32],转化成seq只需要其低32bit加上isn即可。- 因为seq的域是
[0,2^32-1],abs_seq每增加2^32,其对应的seq就回到起点的isn 对于任意isn,abs_seq : 0 / 2^32 / k * 2^32 + x ---> seq : isn + x如 abs_seq = 2^32 + 3 ---> seq = isn + 3 = 1 ,当isn = 2^32-2时,seq = 11
2
3
4WrappingInt32 wrap(uint64_t abs_seq, WrappingInt32 isn) {
return isn + static_cast<uint32_t>(abs_seq);
// 该加法(底层就是uint32位加法)会自动对2^32取模
}
sequence number -> absolute sequence number
- uint64_t unwrap(WrappingInt32 seq, WrappingInt32 isn, uint64_t checkpoint)
- Given a sequence number (n), the Initial Sequence Number (isn), and an absolute checkpoint sequence number, compute the absolute sequence number that corresponds to n that is closest to the checkpoint
- closet : 没有向上取最接近还是向下取最接近之分,无论上下,只要求最接近即可。
- 易知对于任意seq,其在
[0,2^64-1]的每个2^32大小的域上,都有一个abs_seq_num,wrap函数的目标就是返回其中最接近checkpoint的abs_seq_num - 思路:
- 先求出abs no 在域
[0,2^32-1]上的值为abs_base,易知目标abs_seq_num = abs_base + k * 2^32。
- 先求出abs no 在域
- 求出最接近的abs_seq_num ?
- 找到abs_base和checkpoint的距离offset:令checkpoint在[0,2^32-1|域上对应的值 和 [0,2^32-1]域上的abs_base求差。|checkpoint(low 32bit) - abs_base|。
- a. 先假设checkpoint lowbit > abs_base。
- 然后讨论abs_seq的两种情况:
- 落在checkpoint左侧,abs_seq1 = check_point - offset ;落在checkpoint右侧,abs_seq2 = check_point + 2^32 - offset。如下图1
- 等价为 offset的两种情况。
- offset < 2^31 , then abs_seq1; offset >= 2^31 , then abs_seq2。
- (对于左右abs_seq和check_point距离相等时,我选择了右侧的check_point,因为我觉着从定义来讲,我们传入的check_point是接收窗口最左端,也即_first_unassembled,也即ackno,_first_unassembled左侧的字节都已经读完了进入bytestream,我们期待的,以及理论上应该接接收的是位于滑动窗口最左端往右的字节,故选择右侧而非左侧。
- 然后讨论abs_seq的两种情况:
- b. 同理,当checkpoint lowbit < abs_base,情况如下图2。
- offset <= 2^31 , then abs_seq1 ; offset > 2^31 , them abs_seq2。

- seq -> abs_seq举例
1
2
3
4
5
6
7
8
9
10
11
12// 1. Unwrap the first byte after ISN
seq = 1 , isn = 0 , check_point = 0 -> closet abs_seq = 1 = seq - isn(逻辑)
test_should_be(unwrap(WrappingInt32(1), WrappingInt32(0), 0), 1ul);
// 2. Unwrap the last byte before the third wrap
seq = 2^32 - 2 , isn = 0 , check_point = 3 * 2^32 -> closet abs_seq = 3 * 2^32 - 2 = check_point - 2
test_should_be(unwrap(WrappingInt32(UINT32_MAX - 1), WrappingInt32(0), 3 * (1ul << 32)), 3 * (1ul << 32) - 2);
// 3. Non-zero ISN
seq = 2^32 - 1 , isn = 10 , check_point = 2 * 2^32 -> closet abs_seq = 3 * 2^32 - 1 - 10
test_should_be(unwrap(WrappingInt32(UINT32_MAX), WrappingInt32(10), 3 * (1ul << 32)), 3 * (1ul << 32) - 11);
// 4. Unwrap a non-zero ISN
seq = 16 , isn = 16 , check_point = 0 -> closet abs seq = seq - isn = 0
test_should_be(unwrap(WrappingInt32(16), WrappingInt32(16), 0), 0ul);- corner case : 对于下溢情况,选择非下溢的一侧
- seq = 2^32 - 1 , isn = 0 , check_point = 0 -> closet seq = 2 ^ 32 - 1
- 按照closet的逻辑,closet seq 有两个选择
- 位于check_point左侧,即 abs_seq = checkpoint(0) - 1 = 18446744073709551615
- 位于check_point右侧,即 abs_seq = checkpoint(0) + 2^32 - 1 = 2^32 - 1
- 若仅仅按照closet,则应当选择check_point左侧的abs_seq(18446744073709551615),但实际上,应当选择check_point右侧的abs_seq
- 原因如下:unwrap的意义
- 目的是将字节流中的seq,依据checkpoint,转化成正确的对应的abs_seq。
- 易知按照100 GB/sec的传输速度,要发送五十多年,才能发送到2^64bytes,可以看作abs_seq是不可能到达2^64级别的byte。
- 故本case中check_point(0)左侧的下溢情况应当为非法情况。应选择 abs_seq = 2^32 - 1
- 故:
- 当abs_seq位于check_point左侧 且 发生 下溢(到2^64bytes) 时,选择check_point右侧的abs_seq。
- 当abs_seq位于check_point右侧 且 发生 上溢(重新到0)时,正常计算、上溢取模即可。
1
2// corner case : 4. Big unwrap
test_should_be(unwrap(WrappingInt32(UINT32_MAX), WrappingInt32(0), 0), static_cast<uint64_t>(UINT32_MAX));
- seq = 2^32 - 1 , isn = 0 , check_point = 0 -> closet seq = 2 ^ 32 - 1
- corner case : 对于下溢情况,选择非下溢的一侧
- 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33uint64_t unwrap(WrappingInt32 seq, WrappingInt32 isn, uint64_t checkpoint) {
// seqno -> absolute seq no
uint32_t base_abs = seq - isn;
if( base_abs >= static_cast<uint32_t>(checkpoint))
{
uint32_t offset = base_abs - static_cast<uint32_t>(checkpoint);
if(offset <= (1ul<<31))
{
return checkpoint + offset;
}
else
{
// corner case : 当下溢时 选择右侧 而不是左侧溢出到2^64
if(checkpoint < (WrappingInt32::_MOD - offset))
return checkpoint + offset;
return checkpoint - (WrappingInt32::_MOD - offset);
}
}
else
{
uint32_t offset = static_cast<uint32_t>(checkpoint) - base_abs;
if(offset > (1ul<<31))
{
return checkpoint + (WrappingInt32::_MOD - offset);
}
else
{
if(checkpoint < offset)
return checkpoint + (WrappingInt32::_MOD - offset);
return checkpoint - offset;
}
}
}
- uint64_t unwrap(WrappingInt32 seq, WrappingInt32 isn, uint64_t checkpoint)
TCP Receiver实现
报文信息 & 阶段
- 关于报文

- 蓝色:receiver需要读取、关注的
- seq , syn flag , fin flag , playload
- 红色:receiver需要填充的
- ackno , window_size
- 蓝色:receiver需要读取、关注的
- 复习一下三次握手四次挥手的图


- 将receiver划分为如下阶段。

- LISTENING : 还没接收到初始化ISN的SYN报文.(the begining of the stream)
- SYN_RECV : 已经接收SYN报文 && 还没接收FIN报文
- FIN_RECV : 已经接收到FIN报文且已经传入给bytestream。(即bytestream已经关闭)
1
2
3bool listening() { return !_isn.has_value(); }
bool syn_recv() { return _isn.has_value() && !_reassembler.stream_out().input_ended(); }
bool fin_recv() { return _reassembler.stream_out().input_ended(); }
- 再看一眼receive_window图

接口实现
ackno
- std::optional
ackno() const; - 注意:实现的ackno 是 累计确认 ackno
- 如分析中所述,seqno = receive window left edge = reciver第一个没被写入bytestream的字节。
- 注意以seqno的形式获取,而非stream_idx:stream_idx –> abs_seq –> seq
- 注意receiver在不同阶段,所获取的abs_seq不同,自然得出seq不同。
- LISTENING : 还没接收到第一个SYN报文,也即还没建立连接,即还没初始化isn。返回空即可。
- SYN_RECV : abs_seq = stream_idx + 1。因为SYN报文会占据abs_seq的起始索引.
- FIN_RECV : abs_seq = stream_idx + 2。因为SYN、FIN报文均会占据abs_seq索引位置。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17optional<WrappingInt32> TCPReceiver::ackno() const {
// 还没初始化isn,也即还没建立连接,还没接收到第一个SYN报文
if(_state == LISTENING)
{
return std::nullopt;
}
// +1 是为了第一个初始化syn报文 规定其占据一个字节的位置
else if(_state == SYN_RECV)
{
return wrap(_reassembler.first_unassembled() + 1,_isn.value());
}
// +2 : syn + fin
else if(_state == FIN_RECV)
{
return wrap(_reassembler.first_unassembled() + 2,_isn.value());
}
}
segment_received
- void TCPReceiver::segment_received(const TCPSegment &seg)
- 每次接收到对端发送的segment时,都会调用该segment_received
- 主要工作
- 接收第一个具有syn flag的报文时 , 初始化isn = seq.
- 对于data,push_substring(data).
- 接收fin报文(end-of-stream marker),代表该fin是该stream的最后一个字节.
- 注意
- 同一个报文中可能同时有syn flag和fin flag.
- state什么时候变成fin_recv , 是segment_received刚收到fin flag ,state就改变吗 ?
- 不是,此时可能fin flag只是位于receive_window中 ;当fin -> eof被加入到reassembler中的bytestream时(bytestream.end_input()),state才变成fin_recv
- 对于第一个建立连接,初始化isn的syn报文,其也有可能携带应用层报文数据,即(这和我之前学的三次握手不一样啊。我记着第一次握手的syn报文是连接初始化isn,没有携带应用层数据。
1
2test.execute(ExpectState{TCPReceiverStateSummary::LISTEN});
test.execute(SegmentArrives{}.with_syn().with_seqno(isn).with_data("Hello, CS144!").with_result(SegmentArrives::Result::OK)); - 对于invalid segment seq: ignore
- 如segment的seq == isn
1
2
3TCPReceiverTestHarness test{4};
test.execute(SegmentArrives{}.with_syn().with_seqno(isn).with_result(SegmentArrives::Result::OK));
test.execute(SegmentArrives{}.**with_seqno(isn)**.with_data("a").with_result(SegmentArrives::Result::OK)); // WRONG INDEX isn !
- 如segment的seq == isn
- 核心逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21void TCPReceiver::segment_received(const TCPSegment &seg) {
// 0. corner case such as invalid idx
if(corner(seg))
return ;
// 1. 首次确立isn
if(_state == LISTENING && seg.header().syn)
{
_isn = seg.header().seqno;
update_state();
}
// 2. 如果还没初始化isn,且该报文不是syn报文
if(_state == LISTENING)
return ;
// 3. seq -> abs_seq -> stream_idx
size_t abs_seq = unwrap(seg.header().seqno,_isn.value(),_reassembler.first_unassembled());
size_t stream_idx = abs_seq_to_stream_idx(abs_seq);
// 4. push_substring(segment)
_reassembler.push_substring(string(seg.payload().str()),stream_idx,seg.header().fin);
update_state();
}
window_size
- window_size() : [_first_unassembled , first_unacceptable())
1
2
3
4
5
6size_t window_size() const{
// _first_unassembled = _output.bytes_written();
// first_unacceptable() = _capacity + _output.bytes_read()
// return first_unacceptable() - _first_unassembled;
return _capacity + _output.bytes_read() - _output.bytes_written();
}
问题
- 值得后续 注意 & 疑问
- 可以看到 对于 receiver 只有第一个syn报文 的syn 可以被receiver识别到,后来的报文,即使syn为true,receiver也识别不到
- 可以看到,本lab中并没有要求做出ESTABLISHED状态,也并没有FIN_WAIT状态。留待观察,日后应该会实现。
- 流量控制 – 拥塞控制区别
- 已知流量控制 是 flow-controled ,是通过receiver的 receive_window 的ackno以及 window_size实现的。
- 那么拥塞控制呢???
- 并没有实现拥塞控制. 见lab3 以及 计网-拥塞控制 blog
case
- 添加示例case1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35===============my test3==============
capacity 4000
1000 0 0 shc
shc
/* segment with SYN + data + FIN */
{
TCPReceiver receiver(4000);
uint32_t isn = 1000;
// now LISTEN
Assert(not receiver.ackno().has_value());
TCPSegment seg;
seg.payload() = std::string("shc");
seg.header().ack = false;
seg.header().fin = true;
seg.header().syn = true;
seg.header().rst = false;
seg.header().ackno = WrappingInt32(0);
seg.header().seqno = WrappingInt32(isn);
seg.header().win = 0;
receiver.segment_received(seg);
// already has received syn
Assert(receiver.ackno().has_value());
// now FIN_RECV
Assert(receiver.stream_out().input_ended());
Assert(receiver.ackno().value() == WrappingInt32(0 + 3 + 1 + 1 + isn)); // syn(0) + "shc"[123] + fin(4) -> 5 + isn = 1005;
Assert(receiver.unassembled_bytes() == 0);
cout<<receiver.stream_out().read(receiver.stream_out().buffer_size())<<endl; // shc
// stream empty && closed
Assert(receiver.stream_out().eof());
//////// bytestream ////////// ////// recv_window //////
//////syn shc ////////// ////// fin empty //////
// abs_seq //////0 123 ////////// ////// 4 5 //////
// seq = abs_seq + isn
} - 添加示例case0 : 普通通信
- syn报文 , data报文 , fin报文
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99======================my test 02==============
capacity 200
5 0 0 abcdefg
14 9 8 abc
13 8 7 a
abcdefgaabc
17 12 11
{
cout<<"======================my test 02=============="<<endl;
TCPReceiver receiver(200);
// window size
Assert(receiver.window_size() == 200);
// tcp segment abcdefg , syn isn = 5
{
TCPSegment seg;
seg.payload() = std::string("abcdefg");
seg.header().ack = false;
seg.header().fin = false;
seg.header().syn = true;
seg.header().rst = false;
seg.header().ackno = WrappingInt32(0);
seg.header().seqno = WrappingInt32(5);
seg.header().win = 0;
receiver.segment_received(std::move(seg));
Assert(receiver.ackno().has_value() == true);
Assert(receiver.stream_out().bytes_written() == 7);
Assert(receiver.ackno() == WrappingInt32(13));
//////// bytestream ////////// ////// recv_window //////
//////// abcdefg ////////// ////// empty //////
// abs_seq ////////01234567 ////////// ////// 8 //////
// seq = abs_seq + 5
}
{
// abc , seqno = 14
TCPSegment seg;
seg.payload() = std::string("abc");
seg.header().ack = false;
seg.header().fin = false;
seg.header().syn = false;
seg.header().rst = false;
seg.header().ackno = WrappingInt32(0);
seg.header().seqno = WrappingInt32(14);
seg.header().win = 0;
receiver.segment_received(std::move(seg));
Assert(receiver.stream_out().bytes_written() == 7);
Assert(receiver.unassembled_bytes() == 3);
//////// bytestream ////////// ////// recv_window //////
//////// abcdefg ////////// ////// abc //////
}
{
// a , seqno = 13
TCPSegment seg;
seg.payload() = std::string("a");
seg.header().ack = false;
seg.header().fin = false;
seg.header().syn = false;
seg.header().rst = false;
seg.header().ackno = WrappingInt32(0);
seg.header().seqno = WrappingInt32(13);
seg.header().win = 0;
receiver.segment_received(std::move(seg));
Assert(receiver.stream_out().bytes_written() == 11); // abcdefgaabc
Assert(receiver.unassembled_bytes() == 0);
Assert(receiver.ackno() == WrappingInt32(17)); // syn(0) + abcdefgaabc[1,11] -> 12 + isn(5)
ByteStream stream = receiver.stream_out();
cout<<stream.read(stream.buffer_size())<<endl;
//////// bytestream ////////// ////// recv_window //////
//////// abcdefgaabc ////////// ////// empty //////
// abs_seq //////0 1234567891011////////// ////// 12 //////
// seq = abs_seq + 5
}
{
// "" , fin , seqno = 17
TCPSegment seg;
seg.payload() = std::string("");
seg.header().ack = false;
seg.header().fin = true;
seg.header().syn = false;
seg.header().rst = false;
seg.header().ackno = WrappingInt32(0);
seg.header().seqno = WrappingInt32(17);
seg.header().win = 0;
receiver.segment_received(std::move(seg));
Assert(receiver.stream_out().bytes_written() == 11); // abcdefgaabc
Assert(receiver.unassembled_bytes() == 0);
Assert(receiver.ackno().value() == WrappingInt32(18)); // syn(0) + abcdefgaabc[1,11] + fin[12] -> 13 + isn(5)
//////// bytestream ////////// ////// recv_window //////
//////// abcdefgaabc ////////// ////// fin empty //////
// abs_seq //////0 1234567891011////////// ////// 12 13 //////
// seq = abs_seq + 5
}
}
- syn报文 , data报文 , fin报文

