Lab3 TCPSender

前置知识

在TCP协议中,TCPSender负责对ack进行处理,将字节流封装为TCP报文,根据拥塞窗口的大小传输数据,以及管理超时重传。

我们的TCPSender需要做的是:

  1. 维护拥塞窗口

    image-20230228105405827

    我们需要通过ackno和window_size两个参数维护拥塞窗口的大小

  2. 填充拥塞窗口

    必须as possible。除非拥塞窗口满或者ByteStream空才不填。

    对于从ByteStream读出的数据,我们需要把其封装为一个TCPSegment再向_segment_out输出

  3. 记录哪一部分ack了,哪一部分没有ack

    我们需要在发送segment的同时暂存segment,当且仅当接收到ack,并且ack为segment.seqno+length的时候才能将其释放。

  4. 管理超时重传

    当对方超过一段时间还没有收到数据时,需要进行超时重传

    以segment为单位,一个segment重传具有原子性。

    在sender和暂存segment的数据结构中保存时钟滴答

特别的,指导书上有一段话表述得很有意思:

image-20230228110046088

这体现了TCPReceiverTCPSender之间的对偶关系,这种细节性的设计理念值得学习。

感想

写完TCPSender后我还是觉得有些迷茫……就跟TCPReceiver一样。说不出来具体是哪里不清楚,但总感觉隐隐约约有些怪怪的?总感觉相互之间接口有点混乱,对它们之间是怎么交互的一概不知。我想这是由于我们是自底向上实现TCP协议所带来的问题。希望这种感觉在实现完TCPConnection之后可以好转吧。

TCPReceiver的主要任务是把segment拼接成字节流,以及维护即将要告知TCPSender的ackno和拥塞窗口大小。而TCPSender的作用就是把字节流切成segment,并且根据ackno和拥塞窗口大小,进行数据的填充以及超时重传的管理。可以看到,它们是对偶的关系。

初见思路

看完指导书以及各种接口定义可以得知,我们需要:

  1. 增加成员变量

    1. window_size 拥塞窗口的大小

    2. ackono 记录当前收到的最大ackno

    3. ticks 记录sender从出生到现在的时钟滴答

    4. tmp_size 记录tmp_segments 中的数据字节数(注意算上SYN和FIN)

    5. tmp_segments 暂存segment,等待收到ack

      数据结构:

      list,自定义struct,结构体内有

      • TCPSegment
      • seqno 记录该segment的起始数据的seq
      • data_size 记录该segment携带数据的长度
    6. cons_retran 记录连续的超时重传次数

    7. syn 标记当前是否为第一个segment

    8. fin

    9. rto 记录当前的RTO

    10. timer_start 记录timer是否等待中

    11. timer_ticks 记录timer开启时的时间

  2. 实现一个定时函数

    第一次从bytestream取出数据包装为segment的时候(也即发送SYN报文)开启它,当所有data都收到ack的时候(也即FIN报文也被成功ACK)关闭它

    应该在ticks中被调用

    Every time a segment containing data (nonzero length in sequence space) is sent (whether it’s the first time or a retransmission), if the timer is not running, start it running so that it will expire after RTO milliseconds.

    当timer触发时,我们需要重传tmp_segments 队列头。

    如果空间足够,直接重传就行了,然后double RTO,然后用RTO reset timer,然后再次启动timer。

    如果空间不足够,只做上面那个的后两步,也即reset timer,然后再次启动timer。

  3. ack_received

    1. 更新window_size和ackno

    2. 重置超时重传

      如果接收到的ackno比以前的大,则重置RTO,重启timer(如果tmp_segments不为空),重置cons_retran

    3. 从tmp_segments中删除元素

    4. 调用fill_window

  4. fill_window

    如果window_size - tmp_size <= 0 或者 byte stream空,则什么也不做

    否则根据syn和fin标记创建一个new segment,然后写入out stream

    no bigger than the value given by TCPConfig::MAX PAYLOAD SIZE (1452 bytes)

    If the receiver has announced a window size of zero, the fifill window method should act like the window size is one.

细节补充

实现起来虽然很复杂,但思路确实很简单,正确思路和初见思路差不多,指导书写得很好很详细【以至于一开始我被指导书这么多内容给吓到了】。在这里只记录点实现过程中遇到的一些小错误以及我各个部分的实现细节补充。

timer实现

指导书的建议是实现一个类,但是我太懒了()而且确实这个timer的状态也很少,因而我就直接把它写在sender里面了。

SYN报文是否可以带数据

此实验未涉及这个。本次全部的测试用例都是SYN报文不携带数据的情况。【因为发出syn报文之后才将window_size设置为非0情况】

如果需要SYN报文不携带数据,可以在fill_window中把这句话:

1
if (!(_stream.buffer_empty() || remaining == 0)) {

修改为这句话:

1
if (!segment.header().syn&&!(_stream.buffer_empty() || remaining == 0)) {

代码

头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class TCPSender {
private:
// our initial sequence number, the number for our SYN.
WrappingInt32 _isn;

// outbound queue of segments that the TCPSender wants sent
std::queue<TCPSegment> _segments_out{};

// retransmission timer for the connection
unsigned int _initial_retransmission_timeout;// 初始的超时重传时间

// outgoing stream of bytes that have not yet been sent
ByteStream _stream;

// the (absolute) sequence number for the next byte to be sent
uint64_t _next_seqno{0};

struct OutSegment { // outstanding segment的包装类
TCPSegment segment;
uint64_t seqno;
size_t data_size;
};
std::list<OutSegment> tmp_segments{};// 内部存储结构
size_t tmp_size = 0;// 存储结构中含有的segment的总字节数

// 注意此处一定要初始化为1
size_t window_size = 1;// 拥塞窗口大小
uint64_t ackno = 0;// 最大的ackno
size_t ticks = 0;// 从出生到当前经过的时间

unsigned int cons_retran = 0; // 超时重传连续次数
unsigned int rto;// 当前超时重传时间
bool timer_start = false;// 超时重传timer是否开启
unsigned int timer_ticks = 0;// timer开启时的时间

bool syn = false;// 是否发送了SYN报文
bool fin = false;// 是否发送了FIN报文
public:
void send_empty_rst_segment();
void send_empty_ack_segment(WrappingInt32 t_ackno);
bool fully_acked() const { return _next_seqno == ackno; }

具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
: _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
, _initial_retransmission_timeout{retx_timeout}
, _stream(capacity)
, rto{retx_timeout} {}

uint64_t TCPSender::bytes_in_flight() const { return tmp_size; }

// 尽可能地创造segment并且填充到segment output中
void TCPSender::fill_window() {
// should act like the window size is one
size_t t_win_size = window_size == 0 ? 1 : window_size;
size_t remaining = t_win_size - tmp_size;
// 防止数值溢出的情况
if (t_win_size < tmp_size)
remaining = 0;

// fill as possible
while (remaining > 0) {
// create and fill in a segment
TCPSegment segment = TCPSegment();
// 如果处于CLOSED状态
if (!syn) {
// 转移到SYN_SENT状态
// first segment
segment.header().syn = true;
segment.header().seqno = _isn;
remaining -= 1;
syn = true;
// should start the timer here
rto = _initial_retransmission_timeout;
timer_start = true;
timer_ticks = ticks;
}
// fill in the payload
if (!segment.header().syn && !(_stream.buffer_empty() || remaining == 0)) {
string data = _stream.read(min(remaining, TCPConfig::MAX_PAYLOAD_SIZE));
remaining -= data.length();
Buffer buf = Buffer(move(data));
segment.payload() = buf;
}

// 转移到FIN_SENT状态
if (_stream.eof() && !fin && remaining > 0) {
// last segment
segment.header().fin = true;
fin = true;
remaining -= 1;
}

// segment为空(不为SYN、FIN,也不携带任何数据)
if (segment.length_in_sequence_space() == 0)
break;

segment.header().seqno = wrap(_next_seqno, _isn);
_next_seqno += segment.length_in_sequence_space();
// push into the outstanding segments
tmp_segments.push_back(
{segment, unwrap(segment.header().seqno, _isn, _next_seqno), segment.length_in_sequence_space()});
tmp_size += segment.length_in_sequence_space();
// push into the segment out queue
_segments_out.push(segment);
}
}

void TCPSender::ack_received(const WrappingInt32 ack, const uint16_t wind_size) {
window_size = wind_size;
uint64_t a_ack = unwrap(ack, _isn, ackno);
if (a_ack > _next_seqno)
return; // impossible ack is ignored
if (a_ack > ackno) {
// reset the retransmission
rto = _initial_retransmission_timeout;
timer_ticks = ticks;
cons_retran = 0;
// erase elements from the tmp_segments
for (auto it = tmp_segments.begin(); it != tmp_segments.end();) {
if (a_ack >= it->seqno + it->data_size) {
tmp_size -= (it->segment).length_in_sequence_space();
// 如果FIN报文被成功接收,就关闭timer
// FIN_ACKED
if (it->segment.header().fin)
timer_start = false;
it = tmp_segments.erase(it);
} else
it++;
}
}
ackno = a_ack;
fill_window();
}

void TCPSender::tick(const size_t ms_since_last_tick) {
if (ticks > ticks + ms_since_last_tick) {
// 进行简单的溢出处理,还是有可能溢出
ticks -= timer_ticks;
timer_ticks = 0;
}
ticks += ms_since_last_tick;

if (timer_start && ticks > timer_ticks && ticks - timer_ticks >= rto) {
if (!tmp_segments.empty()) {
// resend
_segments_out.push(tmp_segments.front().segment);
if (window_size != 0) {
cons_retran++;
rto *= 2;
}
}
timer_ticks = ticks;
}
}

unsigned int TCPSender::consecutive_retransmissions() const { return cons_retran; }

/* 在TCPConnection中被使用的辅助方法们 */
void TCPSender::send_empty_segment() {
TCPSegment segment = TCPSegment();
segment.header().seqno = wrap(_next_seqno, _isn);
_segments_out.push(segment);
}
void TCPSender::send_empty_ack_segment(WrappingInt32 t_ackno) {
TCPSegment segment = TCPSegment();
segment.header().seqno = wrap(_next_seqno, _isn);
segment.header().ack = true;
segment.header().ackno = t_ackno;
_segments_out.push(segment);
}
void TCPSender::send_empty_rst_segment() {
TCPSegment segment = TCPSegment();
segment.header().seqno = wrap(_next_seqno, _isn);
segment.header().rst = true;
_segments_out.push(segment);
}