实验目的
背景介绍 在这个实验中和下一个实验中,我们需要实现TCP receiver。它接收数据报,把它们转成可靠的字节流,以供socket读取。 TCP sender把字节流分成不同的segments,每个segment都封装成数据报。但是网络传送可能会丢失,重复或失序,这就需要TCP receiver重新组合,还原最初的字节流。
实验目的 完成一个数据结构StreamReassembler
。它接收一些子串(字节流),以及每个子串第一个字节所在字节流的索引。StreamReassembler
有一个成员ByteStream
,只要StreamReassembler
知道了字节流的下一个字节,就会把数据写入ByteStream
中。
实现思路
如上图所示,`capacity`是整个缓存的大小,也就是`ByteStream`的大小,蓝色部分表示已经写入并被`ByteStream`读出来的部分;绿色部分表示已经写入`ByteStream`还没有被读的部分;红色部分是需要我们在一个buffer中缓存起来的部分,就是接收到的字节流,但这一部分字节流只是已经被缓存,但还没有重组,等到连续时一并写入`ByteSream`中。
我们还记得,也就是`ByteStream`中有下面几个成员:
1 2 3 4 size_t capacity_;size_t read_size_; size_t write_size_; bool end_input_;
所以就是说:
上图中的first unread是read_size_
上图的first unassembled是write_size_
,表示第一个可以写到ByteStream
中的字节
上图的first unacceptable是read_size_ + capacity_
所以有以下几种可能性:
如果子字节流的index
落在first unacceptable之后,那么这个substring应该被丢弃。
如果子字节流全部落在[0, first unassembled - 1]中,那么这个substring已经写入了ByteStream
中,也应该丢弃
除此之外,应该截断子字节流,使之完全落在[first unassembled - 1, first unacceptable - 1]中
完成了上述操作后,应该处理字节流区间重复的问题,必须要保证set<Segment> _buffer
中每个Segment
都必须彼此不重复,具体思路见代码。
代码实现 首先定义Segment
结构体。
1 2 3 4 5 6 7 8 9 10 11 12 13 struct Segment { public: size_t idx_; std ::string data_; Segment():idx_(0 ),data_("" ){} Segment(size_t idx, std ::string data):idx_(idx), data_(data){} bool operator<(const Segment& seg) const { return this->idx_ < seg.idx_; } };
为StreamReassembler
添加成员:
1 2 3 4 5 6 7 8 9 10 11 std ::set <Segment> _buffer;bool _eof;size_t _eof_idx;size_t _unassembled_bytes; void handle_substring (const std ::string &data, const uint64_t index) ;void handle_overlap (Segment& seg) ;void adjustment (Segment& seg, const std ::set <Segment>::iterator &it) ;void buffer_erase (const std ::set <Segment>::iterator &it) ;void buffer_insert (Segment& seg) ;
上面的 _buffer
只存储位于上图中红色的区间Segment
,红色区间的大小就为_unassembled_bytes
。
_eof_idx
就是当最后一个结束符号的索引,当 _output
写的字节数(bytes_written()
)等于 _eof_idx
时,应该结束输入,调用 _output.end_input()
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 #include "stream_reassembler.hh" template <typename... Targs>void DUMMY_CODE (Targs &&... ) {} using namespace std ; StreamReassembler::StreamReassembler(const size_t capacity) : _output(capacity), _capacity(capacity), _buffer(), _eof(false ), _eof_idx(0 ), _unassembled_bytes(0 ) { }void StreamReassembler::handle_substring (const string &data, const size_t index) { auto seg = Segment{index, data}; if (seg.idx_ >= _output.bytes_read() + _capacity){ return ; } if (seg.idx_ < _output.bytes_read() + _capacity && seg.idx_ + seg.data_.length() - 1 >= _output.bytes_read() + _capacity){ seg.data_ = seg.data_.substr(0 , _output.bytes_read() + _capacity - seg.idx_); } if (seg.idx_ + seg.data_.length() - 1 < _output.bytes_written()){ return ; } if (seg.idx_ < _output.bytes_written() && seg.idx_ + seg.data_.length() - 1 >= _output.bytes_written()){ seg.data_ = seg.data_.substr(_output.bytes_written() - seg.idx_); seg.idx_ = _output.bytes_written(); } if (_buffer.empty()){ buffer_insert(seg); return ; } handle_overlap(seg); }void StreamReassembler::adjustment (Segment& seg, const std ::set <Segment>::iterator& it) { size_t it_l = it->idx_; size_t it_r = it->idx_ + it->data_.length() - 1 ; size_t seg_l = seg.idx_; size_t seg_r = seg.idx_ + seg.data_.length() - 1 ; if (seg_l <= it_l && seg_r >= it_r){ return ; } if (it_l <= seg_l && it_r >= seg_r){ seg.idx_ = it_l; seg.data_ = it->data_; return ; } if (seg_l >= it_l && seg_r > it_r){ seg.data_ = it->data_ + seg.data_.substr(it->idx_ + it->data_.length() - seg.idx_); seg.idx_ = it->idx_; return ; } if (it_l > seg_l && it_r >= seg_r){ seg.data_ = seg.data_.substr(0 , it->idx_ - seg.idx_) + it->data_; return ; } }void StreamReassembler::handle_overlap (Segment& seg) { auto it = _buffer.begin(); for (; it != _buffer.end();){ size_t it_l = it->idx_; size_t it_r = it->idx_ + it->data_.length() - 1 ; size_t seg_l = seg.idx_; size_t seg_r = seg.idx_ + seg.data_.length() - 1 ; if ((it_l >= seg_l && it_l <= seg_r) || (seg_l >= it_l && seg_l <= it_r)){ adjustment(seg, it); buffer_erase(it++); }else { it++; } } buffer_insert(seg); }void StreamReassembler::buffer_erase (const std ::set <Segment>::iterator& it) { _unassembled_bytes -= it->data_.length(); _buffer.erase(it); }void StreamReassembler::buffer_insert (Segment& seg) { _unassembled_bytes += seg.data_.length(); _buffer.insert(seg); }void StreamReassembler::push_substring (const string &data, const size_t index, const bool eof) { if (!data.empty()){ handle_substring(data, index); } while (!_buffer.empty() && _buffer.begin()->idx_ == _output.bytes_written()){ auto it = _buffer.begin(); _output.write(it->data_); buffer_erase(it); } if (eof){ _eof = true ; _eof_idx = index + data.length(); } if (_eof && _output.bytes_written() == _eof_idx){ _output.end_input(); } }size_t StreamReassembler::unassembled_bytes () const { return _unassembled_bytes; }bool StreamReassembler::empty () const { return _buffer.empty(); }