6.1810 Lab - Networking

实验指导:Lab - Networking

Part One: NIC

这个任务需要实现 E1000 网卡驱动。具体地,kernel/e1000.c 中的 e1000_transmite1000_recv 两个函数。

NIC 驱动执行的两个上下文

Many device drivers execute code in two contexts: a top half that runs in a process's kernel thread, and a bottom half that executes at interrupt time.

如 xv6 book 所描述的那样,这里我们实现的 NIC 的驱动也运行在两个不同的上下文环境:进程的内核态、中断时。

e1000_recv 会在中断上下文中被调用:当有新的以 ethernet frame 来临的时候,E1000 会通过 DMA 的方式把接受到的包写入内存中的缓冲区,然后通过中断的方式告诉 xv6 有新的数据包来了,xv6 陷入内核,通过 usertrap - devintr - e1000_intr - e1000_recv 调用,e1000_recv 负责处理收到的 ethernet frame,并把 ethernet frame 拆解成 IP / ARP / UDP 等数据做进一步处理。

e1000_transmit 会在两个上下文中被调用到:

  • 一是用户程序发包的时候,使用 send 系统调用,sys_send 中把数据按照 UDP - IP - Ethernet 的方式一层一层打包,最后把打包好的数据通过 NIC 提供的 e1000_transmit 发送出去。这里的运行上下文是用户程序的内核态。
  • 第二个场景是 e1000_recv 会使用 net_rx 来接收 ethernet frame,net_rx 会根据 ethernet frame header 中的 type 信息来分流给 ip_rx 和 arp_rx。如果是一个 ARP 请求包的话,arp_rx 需要回复一个 ARP 响应,这个时候会调用 e1000_transmit 发包。这里的运行上下文是中断时。

e1000_transmit

e1000_transmit(char *buf, int len),负责让 E1000 发送出 buf 指向的内容,长度为 len bytes。

对于发包的场景,E1000 管理一个环形缓冲区。这个环形缓冲区用来存放 Transmit Descriptor 这个数据结构,在 xv6 的代码中它叫 tx_desc,这个结构在手册的 3.4 节中有说明。xv6 会在加载 E1000 驱动的时候运行初始化代码 e1000_init,这里会把这个环形缓冲区的地址写入到 E1000 的 TDBAL 寄存器内,写网卡寄存器是通过 MMIO 来实现的。这里还会初始化其他信息,比如这个缓冲区的长度,这个环形缓冲区(即循环队列)的队头和队尾等等。

我们需要把待发的包的元数据,即一个 tx_desc 实例写入这个环形缓冲区中,它会告诉网卡:待发数据的起始地值、长度、发送行为等等信息。这些信息都用来告诉网卡如何发送这些数据。接着我们要修改 TDT 寄存器(环形缓冲区的队尾),这一步既是操作队列,也是一种对网卡的通知,3.4.1 中写道:

When the on-chip buffer is empty, a fetch happens as soon as any descriptors are made available (software writes to the tail pointer).

这个机制有点类似于条件变量的唤醒。看上去像是修改 TDT 后有一次边沿触发。

下面我们就可以实现这个函数了。需要注意的是:

  1. 要给 tx_ring 上锁。因为可能同时有两个不同的进程发包,并且这两个进程并行地运行在两个 CPU 上。
  2. 写入 descriptor 结构前要检查当前 tx_ring 上写入位置的状态。如果没有置位 E1000_TXD_STAT_DD(Descriptor Done Bit),则说明网卡还没有处理完当前的位置,不能写入。
  3. 要释放已经发完的包的内存。
  4. 写入 descriptor 结构时,要清空状态,并且设置 E1000_TXD_CMD_EOP | E1000_TXD_CMD_RS 行为。EOP 代表当前描述符是构成一个完整数据包的最后一个描述符,RS 代表处理完该描述符后写回状态信息到主机内存的描述符中(例如 DD 位,这样下次发包就可以判断当前位置是否可写)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
int
e1000_transmit(char *buf, int len)
{
//
// Your code here.
//
// buf contains an ethernet frame; program it into
// the TX descriptor ring so that the e1000 sends it. Stash
// a pointer so that it can be freed after send completes.
//

acquire(&e1000_lock);

uint32 idx = regs[E1000_TDT];

if (!(tx_ring[idx].status & E1000_TXD_STAT_DD)) {
release(&e1000_lock);
return -1; // No descriptors available
}

if (tx_ring[idx].addr) {
kfree((void*)tx_ring[idx].addr);
}

tx_ring[idx].addr = (uint64)buf;
tx_ring[idx].length = len;
tx_ring[idx].cmd = E1000_TXD_CMD_EOP | E1000_TXD_CMD_RS;
tx_ring[idx].status = 0;

regs[E1000_TDT] = (idx + 1) % TX_RING_SIZE;

release(&e1000_lock);

return 0;
}

e1000_recv

E1000 收包时,会先把数据 DMA 到内存,然后修改 rx_ring 的部分内容(状态、地址等),然后通过中断告诉 xv6 有新的数据来了。

接着 xv6 就会处理这个中断,经过层层调用(usertrap - devintr - e1000_intr - e1000_recv)来到 e1000_recv 这个函数。

e1000_recv 需要处理(接收)RDT(接收描述符队列 rx_ring 队尾)的数据,然后调用 net_rx 处理数据。net_rx 接收 ethernet frame,分流给 arp_rx 和 ip_rx,ip_rx 会继续分流给传输层协议。

与 e1000_transmit 相似地,我们可以实现 e1000_recv。需要注意:

  1. 同样需要检测 DD 位。
  2. net_rx 会把 rx_ring descriptor 中的 addr 指向的内存页面给 kfree 掉,所以我们需要在 net_rx 结束后分配一个新的 page 并重置描述符。
  3. 实验描述中说「The e1000 can deliver more than one packet per interrupt; your e1000_recv should handle that situation.」所以这里要循环收包,把能收的都收住。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static void
e1000_recv(void)
{
//
// Your code here.
//
// Check for packets that have arrived from the e1000
// Create and deliver a buf for each packet (using net_rx()).
//

while (1) {
uint32 idx = (regs[E1000_RDT] + 1) % RX_RING_SIZE;

if (!(rx_ring[idx].status & E1000_RXD_STAT_DD)) {
return;
}

net_rx((char*)rx_ring[idx].addr, rx_ring[idx].length);

// net_rx(...) will kfree the buffer
// We therefore need to kalloc a new one
if (!(rx_ring[idx].addr = (uint64)kalloc())) {
panic("e1000_recv kalloc failed");
}

rx_ring[idx].status = 0;

regs[E1000_RDT] = idx;
}
}

Part Two: UDP Receive

这部分的任务是实现 sys_recv 和 sys_bind 两个系统调用,其中 sys_recv 的目标是实现 UDP 的接收。

生产者-消费者

这里对于每一个被绑定的 port,都存在一对「生产者-消费者」。

这里的生产者是 net_rx 函数:每当网卡检测到有新的数据可以被 receive 的时候,会触发一个中断,中断处理函数会调用 net_rx,net_rx 会把 ethernet_frame 按照协议分流。那么对于 UDP 来说,net_rx 会分流给 ip_rx,接着 ip_rx 需要分流给一个 UDP 协议的接收函数,这个函数是我们要实现的,这里我们把它叫做 udp_rx。udp_rx 应该把 UDP 包的数据放入一个队列中,等待用户程序消费。

这里的消费者是 sys_recv 系统调用:用户程序主动调用这个系统调用来消费队列中的 UDP 包,如果队列为空则等待生产者向队列中推送数据,即等待新数据来的中断。这里的等待需要用到 kernel/proc.c 中的 sleep - wakeup 机制,有点像条件变量。

端口集合

对于每一个被绑定的端口,我们都要维护一个队列,这里我们可以用 ring buffer 来实现。并且我们需要维护一个已经绑定的端口的集合,这样生产者和消费者才能定位到一个具体的队列。

这样每次调用 sys_bind 的时候就是开始启用这个集合中的一个新的槽位。sys_unbind 就是释放一个制定的槽位。

这里我就简单地用数组来实现这个集合,因为:如果只是为了通过 xv6 的测试,这里我们不需要预留很多槽位。

对于每个槽位,我们需要维护端口号以及刚刚我们提到的 ring buffer。ring buffer 需要维护目前队列中元素的个数(flighting packs),头、尾指针,以及每个 pack 的数据。对于每个 pack,我们要维护源 IP,源端口号,UDP 数据,以及 receive 结束后我们需要释放的缓冲区的首地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#define MAX_PORT_SLOTS 20

static struct {
uint16 port;
uint8 in_flight;
uint8 r_idx;
uint8 w_idx;
struct spinlock port_lock;
struct {
int src_ip;
short sport;
char *buf;
char *buf_owner;
int len;
} ring[16];
} ports_info[MAX_PORT_SLOTS];

我们需要初始化这些槽位。

1
2
3
4
5
6
7
8
9
void
netinit(void)
{
initlock(&netlock, "netlock");
memset(ports_info, 0, sizeof ports_info);
for (int i = 0; i < NPROC; ++i) {
initlock(&ports_info[i].port_lock, "net_portlock");
}
}

于是我们可以实现 sys_bind:

sys_bind
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
uint64
sys_bind(void)
{
//
// Your code here.
//

int port;

argint(0, &port);

for (int i = 0; i < MAX_PORT_SLOTS; ++i) {
acquire(&ports_info[i].port_lock);

if (ports_info[i].port != 0) {
release(&ports_info[i].port_lock);
continue;
}

ports_info[i].in_flight = 0;
ports_info[i].r_idx = 0;
ports_info[i].w_idx = 0;
ports_info[i].port = port;
memset(ports_info[i].ring, 0, sizeof(ports_info[i].ring));
release(&ports_info[i].port_lock);
break;
}

return 0;
}

ip_rx / udp_rx

下面我们就可以按照这个思路来修改生产者端的 ip_rx 函数,并且加入 udp_rx 函数。

ip_rx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void
ip_rx(char *buf, int len)
{
// don't delete this printf; make grade depends on it.
static int seen_ip = 0;
if(seen_ip == 0)
printf("ip_rx: received an IP packet\n");
seen_ip = 1;

//
// Your code here.
//

struct eth *ineth = (struct eth*) buf;
struct ip *inip = (struct ip*) (ineth + 1);

switch (inip->ip_p) {
case IPPROTO_UDP:
udp_rx(buf, len);
break;
default:
kfree(buf);
break;
}
}
udp_rx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
void
udp_rx(char *buf, int len)
{
struct eth *ineth = (struct eth*) buf;
struct ip *inip = (struct ip*)(ineth + 1);
struct udp *inudp = (struct udp*)(inip + 1);

uint32 src_ip = ntohl(inip->ip_src);
uint16 dport = ntohs(inudp->dport);
uint16 sport = ntohs(inudp->sport);
uint16 blen = ntohs(inudp->ulen) - sizeof(struct udp);

int i;
int port_idx = -1;
for (i = 0; i < MAX_PORT_SLOTS; ++i) {
acquire(&ports_info[i].port_lock);
if (ports_info[i].port != dport) {
release(&ports_info[i].port_lock);
continue;
} else {
port_idx = i;
release(&ports_info[i].port_lock);
break;
}
}

// drop the pack when the port is not bound
if (port_idx == -1) {
kfree(buf);
return;
}

acquire(&ports_info[port_idx].port_lock);

if (ports_info[port_idx].in_flight >= 16) {
kfree(buf);
release(&ports_info[port_idx].port_lock);
return;
}

uint8 w_idx = ports_info[port_idx].w_idx;
ports_info[port_idx].ring[w_idx].src_ip = src_ip;
ports_info[port_idx].ring[w_idx].sport = sport;
ports_info[port_idx].ring[w_idx].buf = (char*)(inudp + 1);
ports_info[port_idx].ring[w_idx].len = blen;
ports_info[port_idx].ring[w_idx].buf_owner = buf;

ports_info[port_idx].in_flight++;
ports_info[port_idx].w_idx = (w_idx + 1) % 16;

release(&ports_info[port_idx].port_lock);
wakeup(&ports_info[port_idx]);
return;
}

sys_recv

接着我们可以实现消费者端的 sys_recv 了。sys_recv 要做的事情就是把 ring buffer 中的数据复制到指定的用户地址空间中(使用 copyout),然后把内核中为接收这个 pack 分配的页面给释放掉。

sys_recv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
uint64
sys_recv(void)
{
//
// Your code here.
//

int dport;
uint64 p_src_ip;
uint64 p_src_port;
uint64 p_dest_buf;
int maxlen;
int i;
int port_idx = -1;

struct proc *p = myproc();
pagetable_t pt = p->pagetable;

argint(0, &dport);
argaddr(1, &p_src_ip);
argaddr(2, &p_src_port);
argaddr(3, &p_dest_buf);
argint(4, &maxlen);

for (i = 0; i < MAX_PORT_SLOTS; ++i) {
acquire(&ports_info[i].port_lock);
if (ports_info[i].port != dport) {
release(&ports_info[i].port_lock);
continue;
} else {
port_idx = i;
release(&ports_info[i].port_lock);
break;
}
}

// returns err when the port wasn't bound
if (port_idx == -1) {
return -1;
}

acquire(&ports_info[port_idx].port_lock);

while (ports_info[port_idx].in_flight == 0) {
sleep(&ports_info[port_idx], &ports_info[port_idx].port_lock);
}

int r_idx = ports_info[port_idx].r_idx;
copyout(pt, p_src_ip, (char*) &ports_info[port_idx].ring[r_idx].src_ip, sizeof(uint32));
copyout(pt, p_src_port, (char*) &ports_info[port_idx].ring[r_idx].sport, sizeof(uint16));

int b_copied = maxlen;
if (ports_info[port_idx].ring[r_idx].len < b_copied) {
b_copied = ports_info[port_idx].ring[r_idx].len;
}
copyout(pt, p_dest_buf, ports_info[port_idx].ring[r_idx].buf, b_copied);

kfree(ports_info[port_idx].ring[r_idx].buf_owner);
ports_info[port_idx].r_idx = (r_idx + 1) % 16;
ports_info[port_idx].in_flight--;
release(&ports_info[port_idx].port_lock);

return b_copied;
}

实验结果