CS144 Lab 0 - Networking Warmup

这个实验的主要任务是实现一个可靠的字节流(a reliable byte stream)。

为什么需要 「可靠的」 字节流

The Internet itself only provides the service of "best-effort" (unreliable) datagrams.

互联网本身只提供 「尽力而为」 的数据报服务,它是不可靠的。

什么是可靠的字节流

概念

  • 一个可靠的字节流(a reliable byte stream)是一个满足 FIFO 性质的通道,用来传输数据。
  • 它有写端,或输入端("input" side);也有读端,或输出端("output" side)。

目标

  • 从内存的角度,这个通道是有限容量的,这意味着在任何时刻通道里的数据不能超过给定的容量。从限制内存使用的角度是容易理解的。但从使用者的角度,这个流的长度可以是无限的。
  • 实验说明,这个字节流只在单线程运行,于是这里无需考虑并发问题。
  • 速度 > 0.1 Gbit/s。

我们需要实现这些类中的函数:

Writer / Reader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Writer : public ByteStream
{
public:
void push( std::string data ); // Push data to stream, but only as much as available capacity allows.
void close(); // Signal that the stream has reached its ending. Nothing more will be written.

bool is_closed() const; // Has the stream been closed?
uint64_t available_capacity() const; // How many bytes can be pushed to the stream right now?
uint64_t bytes_pushed() const; // Total number of bytes cumulatively pushed to the stream
};

class Reader : public ByteStream
{
public:
std::string_view peek() const; // Peek at the next bytes in the buffer
void pop( uint64_t len ); // Remove `len` bytes from the buffer

bool is_finished() const; // Is the stream finished (closed and fully popped)?
uint64_t bytes_buffered() const; // Number of bytes currently buffered (pushed and not popped)
uint64_t bytes_popped() const; // Total number of bytes cumulatively popped from stream
};

其中 ByteStream 是:

ByteStream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class ByteStream
{
public:
explicit ByteStream( uint64_t capacity );

// Helper functions (provided) to access the ByteStream's Reader and Writer interfaces
Reader& reader();
const Reader& reader() const;
Writer& writer();
const Writer& writer() const;

void set_error() { error_ = true; }; // Signal that the stream suffered an error.
bool has_error() const { return error_; }; // Has the stream had an error?
};

思路

对于 FIFO 性质的数据通道,我们可以考虑用类队列的数据结构来实现。

std::queue<char> / std::deque<char> 的缺陷

实验给出的代码希望 std::string 来表示 bytes,那我们这里就用 char 来代表 byte。考虑 FIFO 的性质,std::queue<char> / std::deque<char> 是可行的,但是在这里不是一个完美的方案,因为这里的 Reader::peek() / Reader::pop(uint64_t len) 都涉及到 「连续地」 从流中读取多个字节。

这里有一个需要注意的地方,Reader::peek() 需要一次性尽可能多地 peek 字节,而不是每次只 peek 一个字节。使用者可以通过先 peek 再 pop 的方式读取,这个时候每次 peek 的越多 pop 的字节数也相应就越多。

std::queue<char> / std::deque<char> 实现的话是不支持一次 peek / pop 多个字节的。

std::queue<std::string> / std::deque<std::string> 的缺陷

当然我们可以使用类似于 std::queue<std::string> / std::deque<std::string> 再加上维护每一个队列元素中已经被消费的部分(一个下标)来实现,如下图所示:

1
2
3
4
5
6
7
8
9
10
11
12
e.g. std::deque<std::string>:

next_read
|
Front = abcdeb - bcdef - hjklasdf = Back
***--- ----- --------

*: bytes that have been consumed.
-: bytes to consume.
next_read: the index that points to the next byte to consume.

Once each byte of the first string (the front of the queue) is fully consumed, the first element can be removed.

但是这样的方案也有一个缺陷。用上图举例,使用者只能 peek 到 abcdeb (第一个 std::string),不能同时 peek 前两个:尽管 std::deque 是支持随机访问的,但是我们需要把前两个 std::string 想接,然后再生成一个新的 std::string_view。这不是一个 std::string_view 的 best practice,std::string_view 应该是一个 std::string 的切片引用。

当然,只 peek 少部分是完全可行的,只是没有办法 peek 到更多。

Ring Buffer

这里可以考虑使用类 Ring Buffer 的结构(环形缓冲区,循环队列)。Ring Buffer 满足这里的 FIFO 特性,除此以外还有这些优势:

  • 比上面这种方案 peek 到更多的 bytes。在实现第一版的时候,我没有很好地理解这个 peek,每次只 peek 出一个 byte,于是 speed test 只能跑到 ~0.5 Gbit/s(pop length 4096)。改为尽可能 peek 更多的 bytes 之后,达到了 ~20 Gbit/s。可以看到同样 \(O(n)\) 的时间复杂度,后者性能提升还是显著的。
  • Ring Buffer 使用连续的内存存储,根据局部性原理,这里的内存访问命中缓存的机率更高。Ring Buffer 只进行一次内存分配,也省去了内存分配额外的开销。当然这里相对于低一点对性能的提升并不会很大。

实现

byte_stream.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class ByteStream
{
public:
explicit ByteStream( uint64_t capacity );

// Helper functions (provided) to access the ByteStream's Reader and Writer interfaces
Reader& reader();
const Reader& reader() const;
Writer& writer();
const Writer& writer() const;

void set_error() { error_ = true; }; // Signal that the stream suffered an error.
bool has_error() const { return error_; }; // Has the stream had an error?

protected:
// Please add any additional state to the ByteStream here, and not to the Writer and Reader interfaces.
uint64_t capacity_;
bool error_ {};
bool close_requested_ { false };
size_t idx_push_ { 0 }; // Next index to push a byte.
size_t idx_pop_ { 0 }; // Next index to pop a byte.
uint64_t total_push_ { 0 }; // Bytes pushed.
uint64_t total_pop_ { 0 }; // Bytes popped.
std::string buf_ {}; // Ring buffer.
};
byte_stream.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#include "byte_stream.hh"

#include <cstring>

using namespace std;

ByteStream::ByteStream( uint64_t capacity ) : capacity_( capacity )
{
this->buf_.resize( capacity );
}

void Writer::push( string data )
{
size_t size_to_push = std::min( this->available_capacity(), data.size() );
this->total_push_ += size_to_push;

size_t index_to_copy { 0 };
while ( size_to_push != 0 ) {
size_t batch_size = std::min( this->capacity_ - this->idx_push_, size_to_push );
memcpy( this->buf_.data() + this->idx_push_, data.data() + index_to_copy, batch_size );

size_to_push -= batch_size;
index_to_copy += batch_size;
this->idx_push_ = ( this->idx_push_ + batch_size ) % this->capacity_;
}
}

void Writer::close()
{
this->close_requested_ = true;
}

bool Writer::is_closed() const
{
return this->close_requested_;
}

uint64_t Writer::available_capacity() const
{
return this->capacity_ - ( this->total_push_ - this->total_pop_ );
}

uint64_t Writer::bytes_pushed() const
{
return this->total_push_;
}

string_view Reader::peek() const
{
size_t len = std::min( this->bytes_buffered(), this->capacity_ - this->idx_pop_ );
return { &this->buf_[this->idx_pop_], len };
}

void Reader::pop( uint64_t len )
{
this->idx_pop_ = ( this->idx_pop_ + len ) % this->capacity_;
this->total_pop_ += len;
}

bool Reader::is_finished() const
{
return this->close_requested_ && this->total_push_ == this->total_pop_;
}

uint64_t Reader::bytes_buffered() const
{
return this->total_push_ - this->total_pop_;
}

uint64_t Reader::bytes_popped() const
{
return this->total_pop_;
}

实验结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
-> % cmake --build build --target check0
Test project /home/yuwei/Workspace/minnow/build
Start 1: compile with bug-checkers
1/11 Test #1: compile with bug-checkers ........ Passed 0.41 sec
Start 2: t_webget
2/11 Test #2: t_webget ......................... Passed 1.26 sec
Start 3: byte_stream_basics
3/11 Test #3: byte_stream_basics ............... Passed 0.01 sec
Start 4: byte_stream_capacity
4/11 Test #4: byte_stream_capacity ............. Passed 0.01 sec
Start 5: byte_stream_one_write
5/11 Test #5: byte_stream_one_write ............ Passed 0.01 sec
Start 6: byte_stream_two_writes
6/11 Test #6: byte_stream_two_writes ........... Passed 0.01 sec
Start 7: byte_stream_many_writes
7/11 Test #7: byte_stream_many_writes .......... Passed 0.06 sec
Start 8: byte_stream_stress_test
8/11 Test #8: byte_stream_stress_test .......... Passed 0.03 sec
Start 37: no_skip
9/11 Test #37: no_skip .......................... Passed 0.01 sec
Start 38: compile with optimization
10/11 Test #38: compile with optimization ........ Passed 0.10 sec
Start 39: byte_stream_speed_test
ByteStream throughput (pop length 4096): 21.90 Gbit/s
ByteStream throughput (pop length 128): 17.26 Gbit/s
ByteStream throughput (pop length 32): 10.02 Gbit/s
11/11 Test #39: byte_stream_speed_test ........... Passed 0.26 sec

100% tests passed, 0 tests failed out of 11

Total Test time (real) = 2.17 sec
Built target check0