其他的对实验未涉及的思考

网络层实现

在我们的协议栈实现中,我们负责了运输层的TCP协议、网络层的ARP协议以及数据链路层的ETH协议的编写,剩下的网络层的IP协议则由官方给定。接下来我们就来探究下网络层的实现。

总体架构

You’ve done this already.

In Lab 4, we gave:

  1. an object that represents an Internet datagram and knows how to parse and serialize itself (tcp_helpers/ipv4_datagram.{hh,cc}) 表示了Internet datagram的数据结构,它可以自己序列化。
  2. the logic to encapsulate(封装) TCP segments in IP (now found in tcp_helpers/tcp_over_ip.cc).

The CS144TCPSocket uses these tools to connect your TCPConnection to a TUN device.

也即,IP协议主要由两个文件实现,一个是IP数据报抽象为的类ipv4_datagram.{hh,cc},另一个是将TCP报文封装为IP报文的类tcp_helpers/tcp_over_ip.cc;除此之外,IP协议还负责与下次协议连接,在实验0-4中它通过CS144TCPSocket与TUN连接,在实验5-6则与TAN连接。

连接部分暂且先放到下一部分讲,下面来看看IP协议的具体实现。

具体实现

ipv4_datagram.hh && ipv4_header.hh

ipv4_datagram没什么好说的,跟TCPSegment的结构一模一样。ipv4_header也没什么好说的,就纯纯是IP数据报的报头、

tcp_over_ip

头文件

它的头文件很简单,只包含一个类的定义:

1
2
3
4
5
6
7
8
// A converter from TCP segments to serialized IPv4 datagrams
class TCPOverIPv4Adapter : public FdAdapterBase {
public:
std::optional<TCPSegment> unwrap_tcp_in_ip(const InternetDatagram &ip_dgram);

InternetDatagram wrap_tcp_in_ip(TCPSegment &seg);
};
#endif // SPONGE_LIBSPONGE_TCP_OVER_IP_HH
具体实现

可以看到,相比于TCP和ETH/ARP协议,IP协议的实现可以说是非常简单。它作为一个中间层,只需要把上面给的东西包装下再传到下面,或者把下面给的东西解包下再传给上面,无需其他复杂的算法和数据结构(比如TCP的reliable transmission和ETH/ARP的地址自学习),也无需跟外界打交道。

除了打包解包外,它只需确保一件事,那就是一台主机只能同时拥有一个TCP连接。这样一来也能简化其实现:填写IP协议头时,它就只需从自己保存的config中取参数就行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// 用来拆IP数据包为一个TCP数据包
//! If this succeeds, it then checks that the received segment is related to the
//! current connection. When a TCP connection has been established, this means
// 如果TCP连接已建立,则会检查src和dst端口号的正确性
//! checking that the source and destination ports in the TCP header are correct.
//!
//! If the TCP connection is listening 如果处于listening状态,并且参数为SYN报文
//! and the TCP segment read from the wire includes a SYN, this function clears the
// 就需要解除listening的flag,记录下src和dst的地址和端口号
//! `_listen` flag and records the source and destination addresses and port numbers
// 目的是为了 filter future reads
// 这说明我们的sponge实现是单线程的,也就是一台主机只能同时建立一个TCP连接
// 并且在此时会忽略其他主机发过来的数据包
//! from the TCP header; it uses this information to filter future reads.

// returns a std::optional<TCPSegment> that is empty if the segment was invalid or unrelated
optional<TCPSegment> TCPOverIPv4Adapter::unwrap_tcp_in_ip(const InternetDatagram &ip_dgram) {
// is the IPv4 datagram for us?
// Note: it's valid to bind to address "0" (INADDR_ANY) and reply from actual address contacted
if (not listening() and (ip_dgram.header().dst != config().source.ipv4_numeric())) {
return {};
}

// is the IPv4 datagram from our peer?
// 过滤非peer发来的其他数据包
if (not listening() and (ip_dgram.header().src != config().destination.ipv4_numeric())) {
return {};
}

// does the IPv4 datagram claim that its payload is a TCP segment?
// 我们只需解包TCP数据报
if (ip_dgram.header().proto != IPv4Header::PROTO_TCP) {
return {};
}

// is the payload a valid TCP segment?
TCPSegment tcp_seg;
if (ParseResult::NoError != tcp_seg.parse(ip_dgram.payload(), ip_dgram.header().pseudo_cksum())) {
return {};
}

// is the TCP segment for us?
if (tcp_seg.header().dport != config().source.port()) {
return {};
}

// should we target this source addr/port (and use its destination addr as our source) in reply?
if (listening()) {
// records the source and destination addresses and port numbers
if (tcp_seg.header().syn and not tcp_seg.header().rst) {
config_mutable().source = {inet_ntoa({htobe32(ip_dgram.header().dst)}), config().source.port()};
config_mutable().destination = {inet_ntoa({htobe32(ip_dgram.header().src)}), tcp_seg.header().sport};
set_listening(false);
} else {
return {};
}
}

// is the TCP segment from our peer?
if (tcp_seg.header().sport != config().destination.port()) {
return {};
}

return tcp_seg;
}

//! Takes a TCP segment, sets port numbers as necessary, and wraps it in an IPv4 datagram
//! \param[in] seg is the TCP segment to convert
InternetDatagram TCPOverIPv4Adapter::wrap_tcp_in_ip(TCPSegment &seg) {
// set the port numbers in the TCP segment
seg.header().sport = config().source.port();
seg.header().dport = config().destination.port();

// create an Internet Datagram and set its addresses and length
InternetDatagram ip_dgram;
ip_dgram.header().src = config().source.ipv4_numeric();
ip_dgram.header().dst = config().destination.ipv4_numeric();
// uint8_t hlen = LENGTH / 4; //!< header length
// uint8_t doff = LENGTH / 4; //!< data offset
ip_dgram.header().len = ip_dgram.header().hlen * 4 + seg.header().doff * 4 + seg.payload().size();

// set payload, calculating TCP checksum using information from IP header
ip_dgram.payload() = seg.serialize(ip_dgram.header().pseudo_cksum());

return ip_dgram;
}

Socket实现

最top的话可以分为CS144TCPSocketFullStackSocket

继承关系如下图:

Inheritance graph

其中,TCPSocket是完完全全的包装类,它的所有协议栈都是在内核态中实现(也就是跟我们之后写的没半毛钱关系),它的存在意义应该是用在lab0来写webget的测试。而CS144TCPSocket就是我们在lab0-4用的了,它的数据链路层由内核实现,网络层和运输层由用户实现。FullStackSocket就是加上了我们在lab5做的用户态数据链路层。

最主要的部分是TCPSpongeSocket的实现,其他就是一些包装类没什么好说的。

FileDescriptor

将socket看作是fd,将网络看作是IO,这一抽象简直是太伟大了,牛逼到爆。

头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//! A reference-counted handle to a file descriptor
class FileDescriptor {
//! \brief A handle on a kernel file descriptor.
//! \details FileDescriptor objects contain a std::shared_ptr to a FDWrapper.
class FDWrapper {
public:
int _fd; // file descriptor number returned by the kernel
bool _eof = false; // fd是否eof
bool _closed = false; // fd是否close
// fd被读写的次数
unsigned _read_count = 0;
unsigned _write_count = 0;

//! Construct from a file descriptor number returned by the kernel
explicit FDWrapper(const int fd);
//! Closes the file descriptor upon destruction
~FDWrapper();
//! Calls [close(2)](\ref man2::close) on FDWrapper::_fd
void close();
//! An FDWrapper cannot be copied or moved
FDWrapper(const FDWrapper &other) = delete;
FDWrapper &operator=(const FDWrapper &other) = delete;
FDWrapper(FDWrapper &&other) = delete;
FDWrapper &operator=(FDWrapper &&other) = delete;
};

//! A reference-counted handle to a shared FDWrapper
std::shared_ptr<FDWrapper> _internal_fd;

// private constructor used to duplicate the FileDescriptor (increase the reference count) 这个构造函数会增加其参数传进来的那个fd的引用,也许相当于dump
explicit FileDescriptor(std::shared_ptr<FDWrapper> other_shared_ptr);

protected:
void register_read() { ++_internal_fd->_read_count; } //!< increment read count
void register_write() { ++_internal_fd->_write_count; } //!< increment write count

public:
//! Construct from a file descriptor number returned by the kernel
explicit FileDescriptor(const int fd);

//! Free the std::shared_ptr; the FDWrapper destructor calls close() when the refcount goes to zero.
~FileDescriptor() = default;

/* 读写 */
std::string read(const size_t limit = std::numeric_limits<size_t>::max());
void read(std::string &str, const size_t limit = std::numeric_limits<size_t>::max());
// possibly blocking until all is written
size_t write(const char *str, const bool write_all = true) { return write(BufferViewList(str), write_all); }
size_t write(const std::string &str, const bool write_all = true) { return write(BufferViewList(str), write_all); }
size_t write(BufferViewList buffer, const bool write_all = true);

//! Close the underlying file descriptor
void close() { _internal_fd->close(); }

//! Copy a FileDescriptor explicitly, increasing the FDWrapper refcount
FileDescriptor duplicate() const;

//! Set blocking(true) or non-blocking(false)
void set_blocking(const bool blocking_state);
// ...

具体实现

差不多就是全程调用系统调用没什么好说的,记录下几个有意思的点

包装系统调用

可以看下其调用系统调用的方式,看起来很有意思:

1
2
3
4
5
6
7
8
9
10
void FileDescriptor::set_blocking(const bool blocking_state) {
int flags = SystemCall("fcntl", fcntl(fd_num(), F_GETFL));
if (blocking_state) {
flags ^= (flags & O_NONBLOCK);
} else {
flags |= O_NONBLOCK;
}

SystemCall("fcntl", fcntl(fd_num(), F_SETFL, flags));
}

比如说这里设置文件读写是否阻塞就是通过系统调用实现的。

在写os实验时,你应该就能很深刻感受到,很多时候调用完一个系统调用后,对它的返回结果进行合法性判断以及错误处理还是有点烦的(举例来说,如if(kalloc() == 0)或者if(mappages() == 0),出错后杀死进程等等等)。在那会我们还可以直接就这么冗余地干了,但是这里不行,一是我们要用面向对象的思想,二是我们的重点事实上并不是操作系统而是网络,因而最好还是这么封装下以减少冗余代码。

而它除了会调用系统调用外,还使用了一个包装性的方法SystemCall来保障调用的安全性和合理性。看看SystemCall的具体实现方式,确实就是包了层安全检查。

1
2
3
4
5
6
7
8
9
10
11
12
int SystemCall(const char *attempt, const int return_value, const int errno_mask) {
if (return_value >= 0 || errno == errno_mask) {
return return_value;
}

throw unix_error(attempt);
}

// os内核看不懂c++,所以要注意转换为c-style的字符串
int SystemCall(const string &attempt, const int return_value, const int errno_mask) {
return SystemCall(attempt.c_str(), return_value, errno_mask);
}

Socket

没什么好说的,只是操作系统socket接口的包装类。

头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// Base class for network sockets (TCP, UDP, etc.)
// Socket is generally used via a subclass. See TCPSocket and UDPSocket for usage examples.
class Socket : public FileDescriptor {
private:
//! Get the local or peer address the socket is connected to
Address get_address(const std::string &name_of_function,
const std::function<int(int, sockaddr *, socklen_t *)> &function) const;

protected:
Socket(const int domain, const int type);
Socket(FileDescriptor &&fd, const int domain, const int type);

template <typename option_type>
void setsockopt(const int level, const int option, const option_type &option_value);
public:
// Bind a socket to a local address, usually for listen/accept
void bind(const Address &address);
// Connect a socket to a peer address
void connect(const Address &address);
// Shut down a socket
void shutdown(const int how);
//! Get local address of socket
Address local_address() const;
//! Get peer address of socket
Address peer_address() const;
//! Allow local address to be reused sooner
void set_reuseaddr();
};

//! A wrapper around [UDP sockets](\ref man7::udp)
class UDPSocket : public Socket {
protected:
//! \brief Construct from FileDescriptor (used by TCPOverUDPSocketAdapter)
//! \param[in] fd is the FileDescriptor from which to construct
explicit UDPSocket(FileDescriptor &&fd) : Socket(std::move(fd), AF_INET, SOCK_DGRAM) {}

public:
//! Default: construct an unbound, unconnected UDP socket
UDPSocket() : Socket(AF_INET, SOCK_DGRAM) {}

// carries received data and information about the sender
struct received_datagram {
Address source_address; //!< Address from which this datagram was received
std::string payload; //!< UDP datagram payload
};
//! Receive a datagram and the Address of its sender
received_datagram recv(const size_t mtu = 65536);
//! Receive a datagram and the Address of its sender (caller can allocate storage)
void recv(received_datagram &datagram, const size_t mtu = 65536);

//! Send a datagram to specified Address
void sendto(const Address &destination, const BufferViewList &payload);
//! Send datagram to the socket's connected address (must call connect() first)
void send(const BufferViewList &payload);
};

//! A wrapper around [TCP sockets](\ref man7::tcp)
class TCPSocket : public Socket {
private:
// Construct from FileDescriptor (used by accept())
// fd is the FileDescriptor from which to construct
explicit TCPSocket(FileDescriptor &&fd) : Socket(std::move(fd), AF_INET, SOCK_STREAM) {}
public:
//! Default: construct an unbound, unconnected TCP socket
TCPSocket() : Socket(AF_INET, SOCK_STREAM) {}
//! Mark a socket as listening for incoming connections
void listen(const int backlog = 16);
//! Accept a new incoming connection
TCPSocket accept();
};

//! A wrapper around [Unix-domain stream sockets](\ref man7::unix)
class LocalStreamSocket : public Socket {
public:
// ...构造器
};
#endif // SPONGE_LIBSPONGE_SOCKET_HH

具体实现

构造器的参数

参考文章

也是系统调用socket的参数,了解一下知识多多益善。

  1. domain

    在本次实验中只会取值前两个,即本地通信和IPv4网络通信

    image-20230309232045195

  2. type

    好像比如说取SOCK_DGRAM就是UDP,取SOCK_STREAM就是TCP。

代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/* Socket */
/* 构造器 */
// default constructor for socket of (subclassed) domain and type
Socket::Socket(const int domain, const int type) : FileDescriptor(SystemCall("socket", socket(domain, type, 0))) {}
// construct from file descriptor
Socket::Socket(FileDescriptor &&fd, const int domain, const int type) : FileDescriptor(move(fd)) { ... }

// get the local or peer address the socket is connected to
// 此为private函数,应该是用于方便下面那两个函数的,虽然我觉得这个设计意图没什么必要()
Address Socket::get_address(const string &name_of_function,const function<int(int, sockaddr *, socklen_t *)> &function) const {
Address::Raw address;
socklen_t size = sizeof(address);
SystemCall(name_of_function, function(fd_num(), address, &size));
return {address, size};
}
Address Socket::local_address() const { return get_address("getsockname", getsockname); }
Address Socket::peer_address() const { return get_address("getpeername", getpeername); }

/*
这两个函数是用于把socket连到CS的
将socket的一端连上本机,就需要调用bind;连上别的什么东西就要用connect
*/
// bind socket to a specified local address (usually to listen/accept)
// address is a local Address to bind
void Socket::bind(const Address &address) { SystemCall("bind", ::bind(fd_num(), address, address.size())); }
// connect socket to a specified peer address
// address is the peer's Address
void Socket::connect(const Address &address) { SystemCall("connect", ::connect(fd_num(), address, address.size())); }

// shut down a socket in the specified way
// how can be `SHUT_RD`, `SHUT_WR`, or `SHUT_RDWR`
void Socket::shutdown(const int how) {
SystemCall("shutdown", ::shutdown(fd_num(), how));
switch (how) {
case SHUT_RD:
register_read();
break;
// ...
}
}

// set socket option,传入协议层以及要设置非选项的键和值
template <typename option_type>
void Socket::setsockopt(const int level, const int option, const option_type &option_value) {
SystemCall("setsockopt", ::setsockopt(fd_num(), level, option, &option_value, sizeof(option_value)));
}

// allow local address to be reused sooner, at the cost of some robustness
// 以鲁棒性为代价,让local address可复用
// Using `SO_REUSEADDR` may reduce the robustness of your application
void Socket::set_reuseaddr() { setsockopt(SOL_SOCKET, SO_REUSEADDR, int(true)); }

/* UDPSocket */
// 从socket中接收数据并放进datagram中
// If mtu is too small to hold the received datagram, this method throws a runtime_error
void UDPSocket::recv(received_datagram &datagram, const size_t mtu) {
// receive source address and payload
// ...
const ssize_t recv_len = SystemCall(
"recvfrom",
::recvfrom(
fd_num(), datagram.payload.data(), datagram.payload.size(), MSG_TRUNC, datagram_source_address, &fromlen));
// ...
}
UDPSocket::received_datagram UDPSocket::recv(const size_t mtu) {
received_datagram ret{{nullptr, 0}, ""};
recv(ret, mtu);
return ret;
}

// 向socket发送数据
void sendmsg_helper(const int fd_num,
const sockaddr *destination_address,
const socklen_t destination_address_len,
const BufferViewList &payload) {
// ...
const ssize_t bytes_sent = SystemCall("sendmsg", ::sendmsg(fd_num, &message, 0));
// ...
}
void UDPSocket::sendto(const Address &destination, const BufferViewList &payload) {
sendmsg_helper(fd_num(), destination, destination.size(), payload);
register_write();
}
void UDPSocket::send(const BufferViewList &payload) {
sendmsg_helper(fd_num(), nullptr, 0, payload);
register_write();
}
// ...

* TCPSpongeSocket

上面那俩类其实就是两个包装类,用来将系统调用包装为c++类,看起来很抽象很迷惑。但到这就不一样了!我们开始用上我们之前写的TCP协议的代码了!

除了跟fd以及socket一致的readwrite以及close之外,TCPSocket最独特的功能,应该就是TCP连接的建立与释放了,其状态转移等逻辑已由我们在Lab0-4实现,此socket类仅实现事件的监听TCP协议对象生命周期的管理

双线程

在详细说明其两个功能——事件监听和生命周期管理——之前,不妨先了解下其总体的架构。

TCPSpongeSocket需要双线程实现。其中一个线程用来招待其owner:它会执行向owner public的connect、read、write等服务。另一个线程用来运行TCPConnection:它会时刻调用connection的tick方法,并且进行事件监听。

1
2
3
4
5
6
7
8
9
10
11
//! \class TCPSpongeSocket
//! This class involves the simultaneous operation of two threads.
//!
//! One, the "owner" or foreground thread, interacts with this class in much the
//! same way as one would interact with a TCPSocket: it connects or listens, writes to
//! and reads from a reliable data stream, etc. Only the owner thread calls public
//! methods of this class.
//!
//! The other, the "TCPConnection" thread, takes care of the back-end tasks that the kernel would
//! perform for a TCPSocket: reading and parsing datagrams from the wire, filtering out
//! segments unrelated to the connection, etc.

事件监听

完成事件监听的核心部分是方法_tcp_loop以及_initialize_TCP中对_eventloop的初始化,还有eventloop的实现。

看下来其实理解难度不大(虽然细节很多并且我懒得研究了),但我认为很值得学习。

_initialize_TCP

主要功能是添加我们想监听的事件,有四个,分别是从app得到数据、有要向app发送的数据、从底层协议得到数据、有要向底层协议发送的数据。具体的话,代码和注释都写得很详细就不说了。

可以看到,TCP与协议栈交互【包括收发数据报】,是通过AdaptT _datagram_adapter;实现的;TCP与上层APP交互【包括传送数据】,是通过LocalStreamSocket _thread_data;实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
template <typename AdaptT>
void TCPSpongeSocket<AdaptT>::_initialize_TCP(const TCPConfig &config) {
_tcp.emplace(config);
// Set up the event loop

// There are four possible events to handle:需要监听以下四种事件
//
// 1) Incoming datagram received (needs to be given to
// TCPConnection::segment_received method)得到底层协议栈送过来的data
//
// 2) Outbound bytes received from local application via a write()
// call (needs to be read from the local stream socket and
// given to TCPConnection::data_written method)得到上层app送过来的data
//
// 3) Incoming bytes reassembled by the TCPConnection
// (needs to be read from the inbound_stream and written
// to the local stream socket back to the application)TCP协议需要向app写入data
//
// 4) Outbound segment generated by TCP (needs to be
// given to underlying datagram socket)TCP需要向外界发送data

// rule 1: read from filtered packet stream and dump into TCPConnection得到外界data
_eventloop.add_rule(_datagram_adapter,
Direction::In,
[&] {
auto seg = _datagram_adapter.read();
if (seg) {
_tcp->segment_received(move(seg.value()));
}
if (_thread_data.eof() and _tcp.value().bytes_in_flight() == 0 and not _fully_acked) { _fully_acked = true; }
},
[&] { return _tcp->active(); });

// rule 2: read from pipe into outbound buffer得到app data
_eventloop.add_rule(
// LocalStreamSocket _thread_data;
// 看来用户是通过socket写入的数据
_thread_data,
Direction::In,
[&] {
const auto data = _thread_data.read(_tcp->remaining_outbound_capacity());
const auto len = data.size();
const auto amount_written = _tcp->write(move(data));
if (amount_written != len) {
throw runtime_error("TCPConnection::write() accepted less than advertised length");
}
if (_thread_data.eof()) {
_tcp->end_input_stream();
_outbound_shutdown = true;
}
},
[&] { return (_tcp->active()) and (not _outbound_shutdown) and (_tcp->remaining_outbound_capacity() > 0); },
[&] {
_tcp->end_input_stream();
_outbound_shutdown = true;
});

// rule 3: read from inbound buffer into pipe向app写入data
_eventloop.add_rule(
_thread_data,
Direction::Out,
[&] {
ByteStream &inbound = _tcp->inbound_stream();
// Write from the inbound_stream into the pipe
const size_t amount_to_write = min(size_t(65536), inbound.buffer_size());
const std::string buffer = inbound.peek_output(amount_to_write);
// 通过向socket写实现
const auto bytes_written = _thread_data.write(move(buffer), false);
inbound.pop_output(bytes_written);

if (inbound.eof() or inbound.error()) {
_thread_data.shutdown(SHUT_WR);
_inbound_shutdown = true;
}
},
[&] {
return (not _tcp->inbound_stream().buffer_empty()) or
((_tcp->inbound_stream().eof() or _tcp->inbound_stream().error()) and not _inbound_shutdown);
});

// rule 4: read outbound segments from TCPConnection and send as datagrams向外界写data
_eventloop.add_rule(_datagram_adapter,
Direction::Out,
[&] {
while (not _tcp->segments_out().empty()) {
// 通过对adapter写实现
_datagram_adapter.write(_tcp->segments_out().front());
_tcp->segments_out().pop();
}
},
[&] { return not _tcp->segments_out().empty(); });
}
_tcp_loop

可以看到,_tcp_loop的功能就是,在condition为真的时候,一是监听我们之前塞进_event_loop的所有事件,二是调用TCPConnectiontick方法来管理时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// condition is a function returning true if loop should continue
// Process events while specified condition is true
// 周期性调用事件condition以达到监听等待事件的效果,管理TCP的tick
template <typename AdaptT>
void TCPSpongeSocket<AdaptT>::_tcp_loop(const function<bool()> &condition) {
auto base_time = timestamp_ms();
// 当条件一直为真时,监听event
while (condition()) {
// 持续监听eventloop中的各种event
auto ret = _eventloop.wait_next_event(TCP_TICK_MS);
// 条件为退出/丢弃
if (ret == EventLoop::Result::Exit or _abort) {
break;
}
// 如果tcp还存活,则调用其tick方法
if (_tcp.value().active()) {
const auto next_time = timestamp_ms();
_tcp.value().tick(next_time - base_time);
_datagram_adapter.tick(next_time - base_time);
base_time = next_time;
}
}
}
eventloop

eventloop具体是通过Linux提供的poll机制来进行事件监听的。

Linux poll机制

怎么说,又一次感受到了“网络就是IO”这个抽象的牛逼之处。操作系统的poll机制和poll函数本质上是针对IO读写来设计的,而正因为网络的本质是IO,正因为网络收发数据包、与上层app交互本质还是IO(因为通过文件描述符),才能在这里采用这种方式进行文件读写。

我的评价是佩服到五体投地好吧

image-20230310185319115

poll函数就是IO等待的一种实现机制。

1
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

事件类型events可以为下列值:

1
2
3
4
5
6
7
8
9
10
11
POLLIN:有数据可读
POLLRDNORM:有普通数据可读,等效于POLLIN
POLLRDBAND:有优先数据可读
POLLPRI:有紧迫数据可读
POLLOUT:写数据不会导致阻塞
POLLWRNORM:写普通数据不会导致阻塞
POLLWRBAND:写优先数据不会导致阻塞
POLLMSG:SIGPOLL消息可用
POLLER:指定的文件描述符发生错误
POLLHUP:指定的文件描述符挂起事件
POLLNVAL:无效的请求,打不开指定的文件描述符

我们在前面的eventloop的rule初始化中:

1
2
3
_eventloop.add_rule(_datagram_adapter,
Direction::In,
[&] { ... });

这个的意思是针对_datagram_adapter这个文件的Direction::In这个事件发生时,就会执行[&]中的事件。那么Direction::In是什么?

1
2
3
4
enum class Direction : short {
In = POLLIN, //!< Callback will be triggered when Rule::fd is readable.
Out = POLLOUT //!< Callback will be triggered when Rule::fd is writable.
};

可见,eventloop具体是通过os提供的IO事件机制来进行监听的。

具体的监听以及执行逻辑由wait_next_event来实现。它主要干的就是,清理掉那些我们不感兴趣的或者已经似了(比如说对应的fd已经close之类的)的事件,然后找到那些触发到了的active的事件并且调用它们的caller。

具体代码还是有些微复杂的,有兴趣可以去看看,这里就不放了。

生命周期的管理

核心部分为方法connectlisten_and_accept以及_tcp_main

connect

由客户端调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Client调用
// 未收到外界连接时,owner进程会阻塞
template <typename AdaptT>
void TCPSpongeSocket<AdaptT>::connect(const TCPConfig &c_tcp, const FdAdapterConfig &c_ad) {
// 初始化tcp的事件监听
_initialize_TCP(c_tcp);
// 初始化adapater
_datagram_adapter.config_mut() = c_ad;

cerr << "DEBUG: Connecting to " << c_ad.destination.to_string() << "...\n";
// 我们实现的:发送SYN报文
_tcp->connect();

// 统一的状态管理
const TCPState expected_state = TCPState::State::SYN_SENT;
// 等待直到条件为假,也即脱离SYN-SENT转移到ESTABLISHED
_tcp_loop([&] { return _tcp->state() == TCPState::State::SYN_SENT; });
cerr << "Successfully connected to " << c_ad.destination.to_string() << ".\n";

// 建立连接后开启connection进程, 执行_tcp_main,继续监听event直到死亡
_tcp_thread = thread(&TCPSpongeSocket::_tcp_main, this);
}
_tcp_main

负责establish状态的监听以及之后关闭TCP连接的擦屁股工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
template <typename AdaptT>
void TCPSpongeSocket<AdaptT>::_tcp_main() {
try {
if (not _tcp.has_value()) {
throw runtime_error("no TCP");
}
// 持续监听直到死亡
_tcp_loop([] { return true; });
shutdown(SHUT_RDWR);
if (not _tcp.value().active()) {
cerr << "DEBUG: TCP connection finished "
<< (_tcp.value().state() == TCPState::State::RESET ? "uncleanly" : "cleanly.\n");
}
_tcp.reset();
} catch (const exception &e) {
cerr << "Exception in TCPConnection runner thread: " << e.what() << "\n";
throw e;
}
}
listen_and_accept

由服务器端调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Server调用
// 未收到外界连接时,owner进程会阻塞
template <typename AdaptT>
void TCPSpongeSocket<AdaptT>::listen_and_accept(const TCPConfig &c_tcp, const FdAdapterConfig &c_ad) {
_initialize_TCP(c_tcp);
_datagram_adapter.config_mut() = c_ad;

_datagram_adapter.set_listening(true);

cerr << "DEBUG: Listening for incoming connection...\n";
// 等待直到ESTABLISHED。注意下这里的状态条件
// 其中各种收发报文的事件由tcp_loop中的event做
_tcp_loop([&] {
const auto s = _tcp->state();
return (s == TCPState::State::LISTEN or s == TCPState::State::SYN_RCVD or s == TCPState::State::SYN_SENT);
});
cerr << "New connection from " << _datagram_adapter.config().destination.to_string() << ".\n";

// 开启connection进程
_tcp_thread = thread(&TCPSpongeSocket::_tcp_main, this);
}

CS144TCPSocket 和 FullStackSocket

主菜(上面那个)已经说完了,这两个就是简单的包装类,没什么好说的,大概就做了点传参工作,主要差异还是adapter。

Adapter实现

在我们的TCPSpongeSocket实现中,我们引入了“adapter”的概念。

1
2
3
4
5
6
7
8
9
10
  protected:
//! Adapter to underlying datagram socket (e.g., UDP or IP)
AdaptT _datagram_adapter;

using TCPOverUDPSpongeSocket = TCPSpongeSocket<TCPOverUDPSocketAdapter>;
using TCPOverIPv4SpongeSocket = TCPSpongeSocket<TCPOverIPv4OverTunFdAdapter>;
using TCPOverIPv4OverEthernetSpongeSocket = TCPSpongeSocket<TCPOverIPv4OverEthernetAdapter>;

using LossyTCPOverUDPSpongeSocket = TCPSpongeSocket<LossyTCPOverUDPSocketAdapter>;
using LossyTCPOverIPv4SpongeSocket = TCPSpongeSocket<LossyTCPOverIPv4OverTunFdAdapter>;

它很完美地以策略模式的形式,凝结出了我们本次实验所需的各种协议栈的共同代码,放进了TCPSpongeSocket,而将涉及到协议栈差异的部分用adapter完成。

TCPSpongeSocket中,adapter主要完成了如下操作:

  1. adapter的tick函数

    1
    2
    3
    // in tcp_loop
    _tcp.value().tick(next_time - base_time);
    _datagram_adapter.tick(next_time - base_time);
  2. 作为订阅事件的IO流

    1
    2
    3
    4
    _eventloop.add_rule(_datagram_adapter,
    Direction::In,
    [&] {
    // ...
  3. TCP层通过对其读写来获取TCP segment

    1
    2
    auto seg = _datagram_adapter.read();
    _datagram_adapter.write(_tcp->segments_out().front());
  4. 记录各类参数

    1
    datagram_adapter.config().destination.to_string()

Inheritance graph

具体实现说实话没什么好说的,确实无非也就是上面那几个方法,然后在里面包装下和操作系统提供的tun和tap的接口交互罢了,代码也比较简单,此处就不说了。

apps

除了对协议栈的实现之外,在app文件夹下还有许多对我们实现的协议栈的应用实例。我认为了解下应用实例也是很重要的。

bidirectional_stream_copy

其作用就是建立stdin/stdout与socket的关联。它从stdin读输入,作为上层app的输入写入socket;从socket读输出,传给上层app,也即stdout输出。它的具体实现在stdin/stdout之间隔了两条bytestream,分别是_inbound_outbound

由于stdin、stdout、socket本质上都是fd,所以我们依然可以采用跟上面一样的事件驱动方式。我们只需在socket有输出时马上读给inbound bytestream,在inbound bytestream有输入时马上读给stdout,在stdin有输入时马上写入outbound bytestream,在outbound bytestream有输入时马上读给socket。遵守这4条rule就行了。

因而,具体实现就是TCPSpongeSocket::_initialize_TCPTCPSpongeSocket::_tcp_loop的结合体,订阅事件+循环等待。由于跟前面类似,在此就不放代码了。

其他

其他都太复杂了,感觉我水平一般还不大能理解,也懒得看了【草】总之先咕咕咕