an object that represents an Internet datagram and knows how to parse and serialize itself (tcp_helpers/ipv4_datagram.{hh,cc}) 表示了Internet datagram的数据结构,它可以自己序列化。
the logic to encapsulate(封装) TCP segments in IP (now found in tcp_helpers/tcp_over_ip.cc).
The CS144TCPSocket uses these tools to connect your TCPConnection to a TUN device.
// A converter from TCP segments to serialized IPv4 datagrams classTCPOverIPv4Adapter : public FdAdapterBase { public: std::optional<TCPSegment> unwrap_tcp_in_ip(const InternetDatagram &ip_dgram);
// 用来拆IP数据包为一个TCP数据包 //! If this succeeds, it then checks that the received segment is related to the //! current connection. When a TCP connection has been established, this means // 如果TCP连接已建立,则会检查src和dst端口号的正确性 //! checking that the source and destination ports in the TCP header are correct. //! //! If the TCP connection is listening 如果处于listening状态,并且参数为SYN报文 //! and the TCP segment read from the wire includes a SYN, this function clears the // 就需要解除listening的flag,记录下src和dst的地址和端口号 //! `_listen` flag and records the source and destination addresses and port numbers // 目的是为了 filter future reads // 这说明我们的sponge实现是单线程的,也就是一台主机只能同时建立一个TCP连接 // 并且在此时会忽略其他主机发过来的数据包 //! from the TCP header; it uses this information to filter future reads.
// returns a std::optional<TCPSegment> that is empty if the segment was invalid or unrelated optional<TCPSegment> TCPOverIPv4Adapter::unwrap_tcp_in_ip(const InternetDatagram &ip_dgram){ // is the IPv4 datagram for us? // Note: it's valid to bind to address "0" (INADDR_ANY) and reply from actual address contacted if (notlistening() and (ip_dgram.header().dst != config().source.ipv4_numeric())) { return {}; }
// is the IPv4 datagram from our peer? // 过滤非peer发来的其他数据包 if (notlistening() and (ip_dgram.header().src != config().destination.ipv4_numeric())) { return {}; }
// does the IPv4 datagram claim that its payload is a TCP segment? // 我们只需解包TCP数据报 if (ip_dgram.header().proto != IPv4Header::PROTO_TCP) { return {}; }
// is the payload a valid TCP segment? TCPSegment tcp_seg; if (ParseResult::NoError != tcp_seg.parse(ip_dgram.payload(), ip_dgram.header().pseudo_cksum())) { return {}; }
// is the TCP segment for us? if (tcp_seg.header().dport != config().source.port()) { return {}; }
// should we target this source addr/port (and use its destination addr as our source) in reply? if (listening()) { // records the source and destination addresses and port numbers if (tcp_seg.header().syn andnot tcp_seg.header().rst) { config_mutable().source = {inet_ntoa({htobe32(ip_dgram.header().dst)}), config().source.port()}; config_mutable().destination = {inet_ntoa({htobe32(ip_dgram.header().src)}), tcp_seg.header().sport}; set_listening(false); } else { return {}; } }
// is the TCP segment from our peer? if (tcp_seg.header().sport != config().destination.port()) { return {}; }
return tcp_seg; }
//! Takes a TCP segment, sets port numbers as necessary, and wraps it in an IPv4 datagram //! \param[in] seg is the TCP segment to convert InternetDatagram TCPOverIPv4Adapter::wrap_tcp_in_ip(TCPSegment &seg){ // set the port numbers in the TCP segment seg.header().sport = config().source.port(); seg.header().dport = config().destination.port();
// create an Internet Datagram and set its addresses and length InternetDatagram ip_dgram; ip_dgram.header().src = config().source.ipv4_numeric(); ip_dgram.header().dst = config().destination.ipv4_numeric(); // uint8_t hlen = LENGTH / 4; //!< header length // uint8_t doff = LENGTH / 4; //!< data offset ip_dgram.header().len = ip_dgram.header().hlen * 4 + seg.header().doff * 4 + seg.payload().size();
// set payload, calculating TCP checksum using information from IP header ip_dgram.payload() = seg.serialize(ip_dgram.header().pseudo_cksum());
//! A reference-counted handle to a file descriptor classFileDescriptor { //! \brief A handle on a kernel file descriptor. //! \details FileDescriptor objects contain a std::shared_ptr to a FDWrapper. classFDWrapper { public: int _fd; // file descriptor number returned by the kernel bool _eof = false; // fd是否eof bool _closed = false; // fd是否close // fd被读写的次数 unsigned _read_count = 0; unsigned _write_count = 0;
//! Construct from a file descriptor number returned by the kernel explicitFDWrapper(constint fd); //! Closes the file descriptor upon destruction ~FDWrapper(); //! Calls [close(2)](\ref man2::close) on FDWrapper::_fd voidclose(); //! An FDWrapper cannot be copied or moved FDWrapper(const FDWrapper &other) = delete; FDWrapper &operator=(const FDWrapper &other) = delete; FDWrapper(FDWrapper &&other) = delete; FDWrapper &operator=(FDWrapper &&other) = delete; };
//! A reference-counted handle to a shared FDWrapper std::shared_ptr<FDWrapper> _internal_fd;
// private constructor used to duplicate the FileDescriptor (increase the reference count) 这个构造函数会增加其参数传进来的那个fd的引用,也许相当于dump explicitFileDescriptor(std::shared_ptr<FDWrapper> other_shared_ptr);
// Base class for network sockets (TCP, UDP, etc.) // Socket is generally used via a subclass. See TCPSocket and UDPSocket for usage examples. classSocket : public FileDescriptor { private: //! Get the local or peer address the socket is connected to Address get_address(const std::string &name_of_function, const std::function<int(int, sockaddr *, socklen_t *)> &function)const;
protected: Socket(constint domain, constint type); Socket(FileDescriptor &&fd, constint domain, constint type); template <typename option_type> voidsetsockopt(constint level, constint option, const option_type &option_value); public: // Bind a socket to a local address, usually for listen/accept voidbind(const Address &address); // Connect a socket to a peer address voidconnect(const Address &address); // Shut down a socket voidshutdown(constint how); //! Get local address of socket Address local_address()const; //! Get peer address of socket Address peer_address()const; //! Allow local address to be reused sooner voidset_reuseaddr(); };
//! A wrapper around [UDP sockets](\ref man7::udp) classUDPSocket : public Socket { protected: //! \brief Construct from FileDescriptor (used by TCPOverUDPSocketAdapter) //! \param[in] fd is the FileDescriptor from which to construct explicitUDPSocket(FileDescriptor &&fd) : Socket(std::move(fd), AF_INET, SOCK_DGRAM) {}
// carries received data and information about the sender structreceived_datagram { Address source_address; //!< Address from which this datagram was received std::string payload; //!< UDP datagram payload }; //! Receive a datagram and the Address of its sender received_datagram recv(constsize_t mtu = 65536); //! Receive a datagram and the Address of its sender (caller can allocate storage) voidrecv(received_datagram &datagram, constsize_t mtu = 65536);
//! Send a datagram to specified Address voidsendto(const Address &destination, const BufferViewList &payload); //! Send datagram to the socket's connected address (must call connect() first) voidsend(const BufferViewList &payload); };
//! A wrapper around [TCP sockets](\ref man7::tcp) classTCPSocket : public Socket { private: // Construct from FileDescriptor (used by accept()) // fd is the FileDescriptor from which to construct explicitTCPSocket(FileDescriptor &&fd) : Socket(std::move(fd), AF_INET, SOCK_STREAM) {} public: //! Default: construct an unbound, unconnected TCP socket TCPSocket() : Socket(AF_INET, SOCK_STREAM) {} //! Mark a socket as listening for incoming connections voidlisten(constint backlog = 16); //! Accept a new incoming connection TCPSocket accept(); };
//! A wrapper around [Unix-domain stream sockets](\ref man7::unix) classLocalStreamSocket : public Socket { public: // ...构造器 }; #endif// SPONGE_LIBSPONGE_SOCKET_HH
/* Socket */ /* 构造器 */ // default constructor for socket of (subclassed) domain and type Socket::Socket(constint domain, constint type) : FileDescriptor(SystemCall("socket", socket(domain, type, 0))) {} // construct from file descriptor Socket::Socket(FileDescriptor &&fd, constint domain, constint type) : FileDescriptor(move(fd)) { ... }
// get the local or peer address the socket is connected to // 此为private函数,应该是用于方便下面那两个函数的,虽然我觉得这个设计意图没什么必要() Address Socket::get_address(const string &name_of_function,const function<int(int, sockaddr *, socklen_t *)> &function)const{ Address::Raw address; socklen_t size = sizeof(address); SystemCall(name_of_function, function(fd_num(), address, &size)); return {address, size}; } Address Socket::local_address()const{ returnget_address("getsockname", getsockname); } Address Socket::peer_address()const{ returnget_address("getpeername", getpeername); }
/* 这两个函数是用于把socket连到CS的 将socket的一端连上本机,就需要调用bind;连上别的什么东西就要用connect */ // bind socket to a specified local address (usually to listen/accept) // address is a local Address to bind voidSocket::bind(const Address &address){ SystemCall("bind", ::bind(fd_num(), address, address.size())); } // connect socket to a specified peer address // address is the peer's Address voidSocket::connect(const Address &address){ SystemCall("connect", ::connect(fd_num(), address, address.size())); }
// shut down a socket in the specified way // how can be `SHUT_RD`, `SHUT_WR`, or `SHUT_RDWR` voidSocket::shutdown(constint how){ SystemCall("shutdown", ::shutdown(fd_num(), how)); switch (how) { case SHUT_RD: register_read(); break; // ... } }
// allow local address to be reused sooner, at the cost of some robustness // 以鲁棒性为代价,让local address可复用 // Using `SO_REUSEADDR` may reduce the robustness of your application voidSocket::set_reuseaddr(){ setsockopt(SOL_SOCKET, SO_REUSEADDR, int(true)); }
/* UDPSocket */ // 从socket中接收数据并放进datagram中 // If mtu is too small to hold the received datagram, this method throws a runtime_error voidUDPSocket::recv(received_datagram &datagram, constsize_t mtu){ // receive source address and payload // ... constssize_t recv_len = SystemCall( "recvfrom", ::recvfrom( fd_num(), datagram.payload.data(), datagram.payload.size(), MSG_TRUNC, datagram_source_address, &fromlen)); // ... } UDPSocket::received_datagram UDPSocket::recv(constsize_t mtu){ received_datagram ret{{nullptr, 0}, ""}; recv(ret, mtu); return ret; }
//! \class TCPSpongeSocket //! This class involves the simultaneous operation of two threads. //! //! One, the "owner" or foreground thread, interacts with this class in much the //! same way as one would interact with a TCPSocket: it connects or listens, writes to //! and reads from a reliable data stream, etc. Only the owner thread calls public //! methods of this class. //! //! The other, the "TCPConnection" thread, takes care of the back-end tasks that the kernel would //! perform for a TCPSocket: reading and parsing datagrams from the wire, filtering out //! segments unrelated to the connection, etc.
template <typename AdaptT> void TCPSpongeSocket<AdaptT>::_initialize_TCP(const TCPConfig &config) { _tcp.emplace(config); // Set up the event loop
// There are four possible events to handle:需要监听以下四种事件 // // 1) Incoming datagram received (needs to be given to // TCPConnection::segment_received method)得到底层协议栈送过来的data // // 2) Outbound bytes received from local application via a write() // call (needs to be read from the local stream socket and // given to TCPConnection::data_written method)得到上层app送过来的data // // 3) Incoming bytes reassembled by the TCPConnection // (needs to be read from the inbound_stream and written // to the local stream socket back to the application)TCP协议需要向app写入data // // 4) Outbound segment generated by TCP (needs to be // given to underlying datagram socket)TCP需要向外界发送data
// rule 1: read from filtered packet stream and dump into TCPConnection得到外界data _eventloop.add_rule(_datagram_adapter, Direction::In, [&] { auto seg = _datagram_adapter.read(); if (seg) { _tcp->segment_received(move(seg.value())); } if (_thread_data.eof() and _tcp.value().bytes_in_flight() == 0andnot _fully_acked) { _fully_acked = true; } }, [&] { return _tcp->active(); });
// rule 2: read from pipe into outbound buffer得到app data _eventloop.add_rule( // LocalStreamSocket _thread_data; // 看来用户是通过socket写入的数据 _thread_data, Direction::In, [&] { constauto data = _thread_data.read(_tcp->remaining_outbound_capacity()); constauto len = data.size(); constauto amount_written = _tcp->write(move(data)); if (amount_written != len) { throwruntime_error("TCPConnection::write() accepted less than advertised length"); } if (_thread_data.eof()) { _tcp->end_input_stream(); _outbound_shutdown = true; } }, [&] { return (_tcp->active()) and (not _outbound_shutdown) and (_tcp->remaining_outbound_capacity() > 0); }, [&] { _tcp->end_input_stream(); _outbound_shutdown = true; });
// rule 3: read from inbound buffer into pipe向app写入data _eventloop.add_rule( _thread_data, Direction::Out, [&] { ByteStream &inbound = _tcp->inbound_stream(); // Write from the inbound_stream into the pipe constsize_t amount_to_write = min(size_t(65536), inbound.buffer_size()); const std::string buffer = inbound.peek_output(amount_to_write); // 通过向socket写实现 constauto bytes_written = _thread_data.write(move(buffer), false); inbound.pop_output(bytes_written);
if (inbound.eof() or inbound.error()) { _thread_data.shutdown(SHUT_WR); _inbound_shutdown = true; } }, [&] { return (not _tcp->inbound_stream().buffer_empty()) or ((_tcp->inbound_stream().eof() or _tcp->inbound_stream().error()) andnot _inbound_shutdown); });
// rule 4: read outbound segments from TCPConnection and send as datagrams向外界写data _eventloop.add_rule(_datagram_adapter, Direction::Out, [&] { while (not _tcp->segments_out().empty()) { // 通过对adapter写实现 _datagram_adapter.write(_tcp->segments_out().front()); _tcp->segments_out().pop(); } }, [&] { returnnot _tcp->segments_out().empty(); }); }
enum classDirection : short { In = POLLIN, //!< Callback will be triggered when Rule::fd is readable. Out = POLLOUT //!< Callback will be triggered when Rule::fd is writable. };
cerr << "DEBUG: Listening for incoming connection...\n"; // 等待直到ESTABLISHED。注意下这里的状态条件 // 其中各种收发报文的事件由tcp_loop中的event做 _tcp_loop([&] { constauto s = _tcp->state(); return (s == TCPState::State::LISTEN or s == TCPState::State::SYN_RCVD or s == TCPState::State::SYN_SENT); }); cerr << "New connection from " << _datagram_adapter.config().destination.to_string() << ".\n";
protected: //! Adapter to underlying datagram socket (e.g., UDP or IP) AdaptT _datagram_adapter;
using TCPOverUDPSpongeSocket = TCPSpongeSocket<TCPOverUDPSocketAdapter>; using TCPOverIPv4SpongeSocket = TCPSpongeSocket<TCPOverIPv4OverTunFdAdapter>; using TCPOverIPv4OverEthernetSpongeSocket = TCPSpongeSocket<TCPOverIPv4OverEthernetAdapter>;
using LossyTCPOverUDPSpongeSocket = TCPSpongeSocket<LossyTCPOverUDPSocketAdapter>; using LossyTCPOverIPv4SpongeSocket = TCPSpongeSocket<LossyTCPOverIPv4OverTunFdAdapter>;