Lab1 StreamReassembler

TCP managed to produce a pair of reliable in-order byte streams (one from you to the server, and one in the opposite direction), even though the underlying network only delivers “best-effort” datagrams.

You also implemented the byte-stream abstraction yourself, in memory within one computer.

Over the next four weeks, you’ll implement TCP, to provide the byte-stream abstraction between a pair of computers separated by an unreliable datagram network.

我们的任务是实现一个StreamReassembler。它的具体功能相信看下数据传输路径就很明了了:

receiver的数据传输路径:network → StreamReassembler整流 →(write)ByteStream(read)→ app

感想

先放个通关截图在这。

image-20230225192145829

这个实验我前前后后总共做了大概有9h+……写我下面放上来的屎山代码可能大概用了5h+。我总共使用了140+行代码实现我的核心函数push_substring

整个过程,包括思路和代码都十分复杂,但最后的表现相比于别人好像也没好到哪去,让我不禁怀疑自己是不是想错了……以及,这样的复杂性也给我带来很多担忧,担心会不会在以后的实验因为这个的bug而寄,毕竟我在写笔记的同时都已经找到了不止一个bug了()希望人没事。

总而言之,先把我的思路和一步步的代码拆解放上来吧。

后记:

不得不说,这个东西太健壮了,给后面的TCPReceiver省去了好多功夫……

比如说,TCPReceiver无需考虑ack怎么算,因为这里就帮你算好了;TCPReceiver无需考虑数据包重叠或者重复,因为这里已经考虑到这个情况了;TCPReceiver无需担忧FIN是否会因为容量满丢弃一部分数据而未达到真正的FIN,只需调用其相关接口判断就行。

它虽然帮助了TCPReceiver那么多,但很神奇的是,它们的耦合性并不高。你把StreamReassembler单独拆出来看,左看右看,它都确实仅仅只是一个健壮的区间合并算法

这得益于实验设计的精良,得益于设计TCPReceiver时的野心。这些边界情况都这么麻烦,而且都只与区间合并有关,那么我们为什么不直接把它抽象进区间合并进行处理呢?这种想法极富胆识,事实证明最后也确实能实现。这种设计理念让我受益很深。

为什么我的思路那么复杂

看了感恩的代码,我发现我俩的大题思路其实差不多是一模一样的,都是先进行两轮的区间合并,然后再处理但为啥我看起来就那么复杂呢?

一是题意理解问题。

我发现他对_capacity的理解跟我的理解不一样emmm……

额好像怪我没认真看指导书的图。

image-20230225232723083-1677339210527-2

我理解的capacity:绿色部分和红色部分的净含量

似乎是真正的capacity:绿色部分+红色部分+空部分,也就是说capacity只是一个跟index差不多的索引下标…………

1
2
3
4
 // out of bound
if (start >= hope_to_rec + _capacity - _output.buffer_size()) {
return;
}

其实这也是很合理的,很符合其语义。我这样错误理解的话,相当于是内存开销增大了,丢包减少了。不过暂时等之后寄掉了再回来改吧。

UPDATE: 确实寄掉了,并且已经改过来了,也不复杂,只需要添加对right边界的处理就行。【指去掉超出start+capacity的部分。】

二是代码规范问题。

首先他代码规范性强,看起来非常舒服。其次他会用类似upper_bound()这样的函数(反观我压根没想起来),这样就显得比我的循环简洁了很多很多。

三是设计问题。

他用的是map我用的是set。确实是map比较合理,它既有find功能也兼具了有序的特性。

我的思路

我们要做的,是将零散的数据包拼接成完整的字节流,并且将整流过的数据送入ByteStream中,这通过核心函数push_substring实现。我们可以先来看看push_substring的定义:

1
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) ;

data为数据包,indexdata中第一个字符在整个字节流的下标,eof用来标识这是字节流的最后一个数据包。

详细说明:

比方说有字节流“abcdefg”,则合法的参数对有如:{“abc”,0,0},{“cdef”,2,0},{“g”,6,1}

通俗来说,我们这个函数的功能就是,把一堆起始下标为indexdata(无序、可能重叠)拼接为一个完整的字节流。

听起来有没有觉得很耳熟?是的,我认为这正是“区间合并”问题。我接下来便通过区间合并的思想,对问题进行如下数据结构以及算法的设计。

数据结构

区间

由于是区间合并问题,所以就先需要定义区间。

1
2
3
4
5
6
struct node {
size_t left; // 当前data位于总体的左index(闭)
size_t right; // 右index(开)
string data;
bool operator<(const node &b) const { return left < b.left; }
};

集合

我们需要维护一个左端点升序的区间集合,故使用内部红黑树实现的有序集合set。

1
set<node> buffer;   // 存储结构

算法

我们要做的,是对数据包进行整流,并且把整流过的部分输送到ByteStream中。由于存储结构存在_capacity的上限,因而,我们需要尽可能早地把存储结构中已经整流好的数据送入ByteStream中。

那么,如何定义“已经整流好的数据”呢?它需要满足跟“之前已经整流好了的数据”的有序性,也即,比方说[0,1000]已经整流完毕送入app,那么下一个送入app的数据一定满足index=1001

因而,我们可以维护一个变量left_bound,表示下一个将被app接受的数据的index(如上例的1001)。为了达到“尽早”目的,我们需要在每次push_substring执行完区间合并之后,检查buffer的第一个区间的左端点是否与left_bound相等,是的话则将第一个区间写入ByteStream,不是的话就什么也不做。

因而,在push_substring中,对于一个新来的数据包,我们大致需要进行以下几步:

  1. 将参数所给的区间( [index, index+data.length()) )并入区间集合buffer
  2. 查看是否需要ByteStream

区间合并

问题定义

问题可抽象为:

给定一个有序区间集合buffer,以及一个小区间node,你需要把node塞进buffer里。

Example: buffer = {[1,3),[5,7)} , node = [6,8) 输出:buffer = {[1,3), [5,8)}

算法思路

判断区间重叠统一只检查左端点。注意,两次重叠的判断条件不一样,是因为相对性发生了改变。第一次相当于node的左端点在buffer[i]中,第二次相当于buffer[i]的左端点在node中。

  1. buffer进行第一轮扫描

    如果node与buffer[i]产生重叠((left >= it->left && left <= it->right)),那么更新node为node∪buffer[i],并且将buffer[i]从buffer中删去。

    在第一次找到重叠的区间,就应该break退出第一轮循环。

  2. buffer进行第二轮扫描

    如果node与buffer[i]产生重叠( (it->left >= left && it->left <= right)),那么更新node为node∪buffer[i],并且将buffer[i]从buffer中删去。

我们在合并区间时,不仅需要对struct node的左端点left和右端点right进行更新,还需要对其数据域data也进行合并拼接。我们维护变量res作为维护的目标区间的数据域。对于res,我们应该进行如下操作:

  1. 初始化为data

  2. 除去[left, left_bound)这一区间内的数据

    这部分数据我们已经整流过并且写入ByteStream

  3. 在两轮合并中对其进行正确拼接

图解

image-20230225200605175

代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
size_t left = index, right = index + data.length();  // 初始化左右区间
size_t o_left = left, o_right = right; // keep the original value
node tmp = {0, 0, ""};

if (right < left_bound) return; // must be duplicated
left = left < left_bound ? left_bound : left; // 左边已经接受过的数据就不要了
string res = data.substr(left - o_left, right - left);// 掐头

/* 开始区间合并。需要扫描两次 */
// 第一次
for (auto it = buffer.begin(); it != buffer.end(); it++) {
if (left >= it->left && left <= it->right) { // 区间重叠
size_t r = right,l = left;
// 更新左右端点
right = max(right, it->right);
left = min(left, it->left);
if (r <= it->right) // 如果目标区间被包裹在it内
// res需要更新为it头+data掐头后的全长+it尾,也即将it中间重叠部分用data替换
res = it->data.substr(0, l - it->left) + data.substr(l - o_left) +
it->data.substr(r - it->left, it->right - r);
else
res = it->data.substr(0, l - it->left) + data.substr(l - o_left);
// 删除原来的结点
buffer.erase(it);
break;
}
}

// 第二次
for (auto it = buffer.begin(); it != buffer.end(); it++) {
if (it->left >= left && it->left <= right) {
if (it->right <= right);// it这个区间被包含在目标区间内,则什么也不做
else {
// 需要加上it的尾
res += it->data.substr(right - it->left, it->right - right);
// 更新右端点
right = it->right;
}
// 删除
buffer.erase(it);
}
}
// 将维护区间插入区间集合
tmp = {left, right, res};
buffer.insert(tmp);

写入ByteStream

思路

我们需要检查buffer内的第一个区间,如果其左端点与left_bound相等,则把第一个区间填入ByteStream,然后更新left_bound,从buffer中删去该区间;如果不相等(只可能是left > left_bound)则什么也不做。

在把区间数据填入ByteStream的过程中,可能造成ByteStream满。因而我们就只能填入第一个区间内的一部分数据,更新left_bound,将第一个区间的剩余数据继续存在buffer中。

代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
auto iterator = buffer.begin();  
iterator = buffer.begin();
// write into the ByteStream
if (iterator != buffer.end() && iterator->left == left_bound) {
// 防止_output的容量超过
size_t out_rem = _output.remaining_capacity();
if (out_rem < iterator->data.length()) { // ByteStream剩余容量小于第一个区间长度
_output.write(iterator->data.substr(0, out_rem));// 写入尽量多数据
left_bound = iterator->left + out_rem;// 更新左边界
// 由于iterator只读,因而我们不能直接修改其左端点和data域
tmp = {left_bound, iterator->right, iterator->data.substr(out_rem)};
buffer.erase(iterator);
buffer.insert(tmp);
} else {
_output.write(iterator->data);
left_bound = iterator->right;
buffer.erase(iterator);
}
}

buffer的最大容量_capacity

背景

维护“存储结构的容量不超过capacity”这个不变性条件可以说是这个实验最恶心最难的地方……也正是它,让我的代码写成了一坨shit山()

为什么说它最难最恶心呢?其实它本来也许不算难,但在这个思路下想要保持这个不变性条件,就显得非常地困难。

一开始没过脑子的时候,我觉得这样就行:

1
2
3
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
if(data.length() + unassemble_bytes() > capacity) return;
}

但是这样很明显有两个问题。

一是就算你超过了,你也不能直接丢弃掉data,得把没超过的部分填满。

二是,data.length() + unassemble_bytes()有时,甚至是很多时候,都不会是将data并入buffer之后buffer的容量。因为data和buffer很大概率会存在重叠区间。

那么,你能不能在区间合并完之后,再进行该不变性条件的判断,并且将没超过的部分填满,超过的部分丢弃呢?

答案是,也不能。因为经过两轮合并,你的data和buffer里原有的数据早已你中有我我中有你了,你无法在最后将它们分开,找出data超过capacity的数据并且丢弃它。

因而,头尾都不行的话,唯一的答案就是,我们只能在两轮区间合并中途,去时刻追踪当前容量是否超过capacity

这听起来就令人十分地头大。但事实证明,并不是无法实现的,坚持下去,就算是shit山也能跑起来()下面便是我的实现思路。

思路

维护一个变量remaining,表示当前还有多少容量。维护start,表示未判断是否可以写入buffer的数据起点。我们要做的事:

  1. 初始化remaining为capacity - 当前容量,start为掐头后的left
  2. 在第一轮循环中更新start
  3. 在第二轮循环中通过start和remaining来判断是否能够写入buffer。尽可能多地写入,把写入不了的部分丢弃。
例子

下面举个例子来说明整个流程。

1
2
initial:  
buffer = { [1,3], [5,8],[10,12],[13,14] } , capacity = 12, remaining = 12-9=3, data = [2,11], start = 2

start为2,是因为data的从2开始的这段数据还不知道能不能被成功存入buffer中。

第一次合并后,buffer = { [5,8],[10,12],[13,14] } ,data=[1,11],start = 3.

start为3,是因为[2,11]与[1,3]合并,由于[2,3]这段本来就在buffer中,因而可以不用占用remaining,但从3开始这段数据还不知道能不能成功存入buffer中。

在第二轮合并中,首先扫描到[5,8]。由于[start,it->left]也即[3,5]这段数据长度为2<remaining=3,故而这段数据可以存入buffer,remaining更新为3-2=1,start更新为it->right=8.

/*

注意,此处无需再对[9,10]进行同样的操作。对于这种情况:

1
buffer = { [1,3], [5,8] } , capacity = 12, remaining = 12-9=3, data = [2,11], start = 2

在循环结束的这里已经处理:

1
2
3
4
5
if (remaining < right - start) {
right = remaining + start;
if (eof == 1)
is_eof = false;
}

*/

然后扫描到[10,12]。由于[start,it->left]也即[8,10]这段数据长度为2>remaining=1,故而这段数据只能把[8,9]这部分存入buffer。因而,我们把到此为止的[1,9]结点存入buffer,剩下的[10,11]部分直接丢弃,也即直接跳入到最后的写入ByteStream部分。

代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
size_t remaining = _capacity - unassembled_bytes(); // how much room left?

size_t left = index, right = index + data.length(); // 初始化左右区间
size_t o_left = left, o_right = right; // keep the original value

size_t start = left; // the begin of the unused data-zone of variable data
// if(remaining == 0) goto end; // buffer满

// 开始区间合并。需要扫描两次
for (auto it = buffer.begin(); it != buffer.end(); it++) {
if (left >= it->left && left <= it->right) {
right = max(right, it->right);
left = min(left, it->left);
// 说明目标区间完全被包裹,也即目标区间一定可以塞进buffer中
if (right == it->right) start = o_right;
// 说明仅仅部分重叠,去重部分从it->right开始
else start = it->right;
// ...
break;
}
}

// 第二次
for (auto it = buffer.begin(); it != buffer.end(); it++) {
if (it->left >= left && it->left <= right) {
// 比remaining满
// 第一个条件是为了防止unsigned溢出
if (it->left > start && remaining < it->left - start) {
// 截取能塞得下的部分
tmp = {left, start + remaining, res.substr(0, start - left + remaining)};
remaining = 0;
// 此时塞进去是肯定不重叠的,因为tmp.right < it->left
buffer.insert(tmp);
// 剩下的直接丢弃
goto end;
}
// 塞得下
remaining -= it->left - start;
start = it->right;
// ...
}
}

// 边界处理
if (remaining < right - start) {
// 扔掉塞不下的部分
right = remaining + start;
}
tmp = {left, right, res};
buffer.insert(tmp);

end:
iterator = buffer.begin();
// write into the ByteStream
// ...
}

capacity还没结束

背景

你以为做到上面那个小标题那样就万无一失了吗?答案是,并不!

我们还有哪里想得不周全呢?考虑这样一个案例,ByteStream未满,但是在更新remaining时发现buffer已满塞不下了。这时候,我们上面的做法是直接扔掉塞不下的部分。但其实,我们还可以查看buffer的一部分数据是否能够再塞进ByteStream,如果能的话,就又能省下一笔空间了!

所以,我们在发现remaining不够时,应该首先检查能不能塞一部分buffer的数据进入ByteStream中用来腾出空间。

代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// 第二次
for (auto it = buffer.begin(); it != buffer.end(); it++) {
if (it->left >= left && it->left <= right) {
// 比remaining满
// 第一个条件是为了防止unsigned溢出
if (it->left > start && remaining < it->left - start) {
// 先看看能不能塞进ByteStream腾空间。需要满足两个条件
// buffer的第一个区间正好是left_bound
if (left == left_bound) {
size_t out_mem = _output.remaining_capacity();
// ByteStream有位置
if (out_mem > 0) {
// 腾出的空间很充足,完全可以不改变remaining
if (out_mem >= it->left - start) {
// 写入ByteStream
_output.write(res.substr(0, it->left - start));
// 更新
res = res.substr(it->left - start);
left_bound = it->left - start + left;
left = left_bound;
// 加上腾出的空间【在ok标签处减掉】
remaining += it->left-start;
goto ok;
} else {
// 空间不足以完全不改变remaining
_output.write(res.substr(0, out_mem));
res = res.substr(out_mem);
left_bound = out_mem + left;
left = left_bound;
// 加上腾出的空间
remaining += out_mem;
// 如果两个加起来就行,则ok
if(it->left>start && remaining>=it->left - start){
goto ok;
}
// 否则remaining依然不充足
}
}
}
// 截取能塞得下的部分
tmp = {left, start + remaining, res.substr(0, start - left + remaining)};
remaining = 0;
// 此时塞进去是肯定不重叠的,因为tmp.right < it->left
buffer.insert(tmp);
// 剩下的直接丢弃
goto end;
}
ok:
// 塞得下
remaining -= it->left - start;
start = it->right;
// ...
}
}
小细节

因而,在一开始发现remaining满的时候,不能直接goto end。

1
// if(remaining == 0)	goto end; // buffer满

因为还得看看ByteStream能不能腾空间。

eof

对于eof的处理也是需要注意的一个小细节。

我们不能这么写:

1
2
if (eof)
_output.end_input();

原因有二,一是最后一个数据包有可能是部分丢失,二是整流可能还未结束。

所以我们应该在成员变量中维护is_eof,记录是否收到过最后一个数据包,并且在最后一个数据包部分丢失的时候置它为false。当且仅当is_eof == true且buffer非空时,才能说明输入结束。

相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
if (eof == 1)
is_eof = true;
// ...
// 第二次
for (auto it = buffer.begin(); it != buffer.end(); it++) {
if (it->left >= left && it->left <= right) {
// 此时空间不够,先尝试下能不能写入一部分到_output中
if (it->left > start && remaining < it->left - start) {
// ...
tmp = {left, start + remaining, res.substr(0, start - left + remaining)};
remaining = 0;
if (eof == 1)
is_eof = false;
buffer.insert(tmp);
goto end;
}
ok:
// ...
}

if (remaining < right - start) {
right = remaining + start;
if (eof == 1)
is_eof = false;
}
// ...
end:
// ...
// 满足两个条件才是真的eof
if (is_eof && buffer.empty()){
is_eof = false;
_output.end_input();
}
}

代码

头文件声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class StreamReassembler {
private:
// Your code here -- add private members as necessary.
struct node {
size_t left; // 当前data位于总体的左index(闭)
size_t right; // 右index(开)
string data;
bool operator<(const node &b) const { return left < b.left; }
};
bool is_eof; // 文件的末尾是否接收成功
size_t left_bound; // 当前已经成功接收到left_bound之前的数据
set<node> buffer; // 存储结构

ByteStream _output; //!< The reassembled in-order byte stream
size_t _capacity; //!< The maximum number of bytes

public:
// ...

// used by the TCPReceiver
void set_is_eof() { is_eof = false; }
}

具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#include "stream_reassembler.hh"
#include <iostream>
#include <map>

template <typename... Targs>
void DUMMY_CODE(Targs &&... /* unused */) {}

using namespace std;

StreamReassembler::StreamReassembler(const size_t capacity)
: is_eof(false), left_bound(0), buffer(), _output(capacity), _capacity(capacity) {}

void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
if (eof == 1)
is_eof = true;

size_t unass = unassembled_bytes();
size_t remaining = _capacity > unass ? _capacity - unass : 0; // how much room left?

size_t left = index, right = index + data.length(); // 初始化左右区间
size_t o_left = left, o_right = right; // keep the original value
auto iterator = buffer.begin(); // 这些变量在这里声明是为了防止后面goto报错
node tmp = {0, 0, ""};

if (right < left_bound) return; // must be duplicated
left = left < left_bound ? left_bound : left; // 左边已经接受过的数据就不要了
right = right <= left_bound + _capacity ? right : left_bound + _capacity; // 右边越界的也不要
o_right = right;
string res = data.substr(left - o_left, right - left);

size_t start = left; // the begin of the unused data-zone of variable data
if (data.compare("") == 0 || res.compare("") == 0) goto end; // 如果data是空串也直接不要
if (o_left >= left_bound + _capacity) goto end; // 越界的不要

// 开始区间合并。需要扫描两次
for (auto it = buffer.begin(); it != buffer.end(); it++) {
if (left >= it->left && left <= it->right) {
size_t r = right;
size_t l = left;
right = max(right, it->right);
left = min(left, it->left);
if (right == it->right) {
start = o_right;
} else {
start = it->right;
}
if (r <= it->right) {
res = it->data.substr(0, l - it->left) + data.substr(l - o_left) +
it->data.substr(r - it->left, it->right - r);
} else {
res = it->data.substr(0, l - it->left) + data.substr(l - o_left);
}
// 删除这一步很关键。
buffer.erase(it);
break;
}
}

// 第二次
for (auto it = buffer.begin(); it != buffer.end(); it++) {
if (it->left >= left && it->left <= right) {
// 此时空间不够,先尝试下能不能写入一部分到_output中
if (it->left > start && remaining < it->left - start) {
if (left == left_bound) {
size_t out_mem = _output.remaining_capacity();
if (out_mem > 0) {
// out的区域本身就很充足
if (out_mem >= it->left - start) {
// 写入ByteStream
_output.write(res.substr(0, it->left - start));
// 更新
res = res.substr(it->left - start);
left_bound = it->left - start + left;
left = left_bound;
remaining += it->left - start;
// out剩下的空位会在最后写入
goto ok;
} else {
// 光是out不够的话,那就先能腾多少空间腾多少
_output.write(res.substr(0, out_mem));
res = res.substr(out_mem);
left_bound = out_mem + left;
left = left_bound;
remaining += out_mem;
// 如果腾出空间加上原来空间足够,那就非常ok
if (it->left > start && remaining >= it->left - start) {
goto ok;
}
// 否则进入错误处理代码
}
}
}
tmp = {left, start + remaining, res.substr(0, start - left + remaining)};
remaining = 0;
if (eof == 1)
is_eof = false;
buffer.insert(tmp);
goto end;
}
ok:
remaining -= it->left - start;
start = it->right;
if (it->right > right){
res += it->data.substr(right - it->left, it->right - right);
right = it->right;
}
buffer.erase(it);
}
}

if (start < o_right && remaining < o_right - start) {
right = start + remaining;
if (eof == 1)
is_eof = false;
}
tmp = {left, right, res};
if (left < right)
buffer.insert(tmp);

end:
iterator = buffer.begin();
// write into the ByteStream
if (iterator != buffer.end() && iterator->left == left_bound) {
size_t out_rem = _output.remaining_capacity();
if (out_rem < iterator->data.length()) {
_output.write(iterator->data.substr(0, out_rem));
left_bound = iterator->left + out_rem;
tmp = {left_bound, iterator->right, iterator->data.substr(out_rem)};
buffer.erase(iterator);
buffer.insert(tmp);
} else {
_output.write(iterator->data);
left_bound = iterator->right;
buffer.erase(iterator);
}
}

// 满足两个条件才是真的eof
if (is_eof && buffer.empty()){
is_eof = false;
_output.end_input();
}
}

size_t StreamReassembler::unassembled_bytes() const {
// 可以跟上面的write合起来的,但在此处我采取了最保守的做法。
size_t res = 0;
for (auto it = buffer.begin(); it != buffer.end(); it++) {
res += it->right - it->left;
}
return res;
}

bool StreamReassembler::empty() const { return buffer.empty()&&is_eof; }