6.1810 Lab - Locks

实验指导:Lab - Locks

这个实验的两个任务是通过修改数据结构和锁策略来优化并发程序,减少竞争。

Memory Allocator

kernel/kalloc.c 中实现的 memory allocator 维护了一个链表 freelist,链表维护了闲置的(可分配的)页面。在调用者调用 kalloc 的时候,kalloc 尝试从 freelist 中拿到一个页面分配给调用者。由于 xv6 是在多 CPU 上执行的,所以 freelist 是和多个 CPU 共享的,被一个 spinlock 保护。这样,在多个 CPU 频繁调用 kalloc 的时候会产生激烈的竞争,这是不够高效的。

这个任务要求优化 kalloc 的设计,实现 per-CPU 的 freelist。满足:

  • kalloc 的时候从当前 CPU 的内存池里进行分配,如果当前内存池里没有闲置页,就去其他的 CPU 的内存池里偷一个页面。这样在当前 CPU 的内存池里有闲置页的时候不会发生竞争,只有在「偷」的时候才会发生竞争,降低了竞争的可能性。
  • kfree 的时候把页面返回给当前 CPU 的内存池,初始化的时候也只给当前 CPU 的内存池装配页面。其它 CPU 会在发现他们的内存池里没有页面的时候来已经装配页面的池子里偷页面,从而这些页面会被偷到每个 CPU 的池子里。
kernel/kalloc.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Physical memory allocator, for user proesses,
// kernel stacks, page-table pages,
// and pipe buffers. Allocates whole 4096-byte pages.

#include "types.h"
#include "param.h"
#include "memlayout.h"
#include "spinlock.h"
#include "riscv.h"
#include "defs.h"

void freerange(void *pa_start, void *pa_end);

extern char end[]; // first address after kernel.
// defined by kernel.ld.

struct run {
struct run *next;
};

struct {
struct spinlock lock;
struct run *freelist;
} kmem[NCPU];

void
kinit()
{
for (int i = 0; i < NCPU; ++i) {
initlock(&kmem[i].lock, "kmem");
}

freerange(end, (void*)PHYSTOP);
}

void
freerange(void *pa_start, void *pa_end)
{
char *p;
p = (char*)PGROUNDUP((uint64)pa_start);
for(; p + PGSIZE <= (char*)pa_end; p += PGSIZE)
kfree(p);
}

// Free the page of physical memory pointed at by pa,
// which normally should have been returned by a
// call to kalloc(). (The exception is when
// initializing the allocator; see kinit above.)
void
kfree(void *pa)
{
struct run *r;

// Disable interrupts for getting CPU ID.
push_off();

int cpu = cpuid();

if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP) {
pop_off();
panic("kfree");
}

// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);

r = (struct run*)pa;

acquire(&kmem[cpu].lock);
r->next = kmem[cpu].freelist;
kmem[cpu].freelist = r;
release(&kmem[cpu].lock);

// Re-enable interrupts.
pop_off();
}

// Allocate one 4096-byte page of physical memory.
// Returns a pointer that the kernel can use.
// Returns 0 if the memory cannot be allocated.
void *
kalloc(void)
{
struct run *r;

// Disable interrupts for getting CPU ID
push_off();

int cpu = cpuid();

acquire(&kmem[cpu].lock);
r = kmem[cpu].freelist;
if (r)
kmem[cpu].freelist = r->next;
release(&kmem[cpu].lock);

// No pages are available from current CPU's freelist,
// now let's attempt to steal one from another CPU.
if (!r) {
for (int i = 0; i < NCPU; ++i) {
if (i == cpu) {
continue;
}
acquire(&kmem[i].lock);
r = kmem[i].freelist;
if (r) {
kmem[i].freelist = r->next;
release(&kmem[i].lock);
break;
} else {
release(&kmem[i].lock);
}
}
}

// Re-enable interrupts.
pop_off();

if(r)
memset((char*)r, 5, PGSIZE); // fill with junk

return (void*)r;
}

Buffer Cache

xv6 的文件系统由七层构成。其中 disk 层代表硬盘,block(或 sector)是硬盘上的数据单元。buffer cache 层负责把硬盘上的 block 缓存在内存里,并且同步修改到硬盘。这个任务的重点就是修改 buffer cache 层的代码。

kernel/bio.c 原本的设计是用双向链表实现了一个 LRU 缓存,但是每次探测空闲块并加入一个元素进缓存的操作包含一个线性的查找,这里的效率不是很高。原因之一是它是线性的;原因之二是整个链表使用一个全局的锁维护,所以在对缓存层频繁操作的时候会发生激烈的竞争。

这个任务希望我们重新实现缓存层的数据结构,来减少锁的竞争。大致的思路是把原来的全局锁替换成更小粒度的锁,然后用哈希表来实现这个缓存(那么锁的粒度就变成了 per-bucket),可以不实现 LRU 策略。

这里需要从头实现一个哈希表来代替原来的 bcache 结构。我实现了一个开散列的哈希表,哈希函数是对素数取模。由于开散列的哈希表要实现一个链表,这里需要预分配一些 buffers 给链表节点使用:我们可以直接用原来的 struct buf 结构作为链表节点(因为它天然有 next / prev 指针),预分配一个全局的数数组给链表的节点。任务里说「You must not increase the number of buffers; there must be exactly NBUF (30) of them.」,那么这里数组的长度我们就还分配 NBUF。

接着我们可以用一个类似 freelist 的结构来管理空闲的 buf,这样就可以 \(O(1)\) 的时间复杂度来获取一个空闲块,避免对 bufs 数组的线性查找。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#define BUCKETS_SIZE 29

// Hashmap for cached buffers.
// Open hashing.
struct {
struct buf head;
struct spinlock lock;
} buckets[BUCKETS_SIZE];

// Buffers as linked list nodes for open hashing.
struct buf bufs[NBUF];

// Free list of available blocks.
struct {
struct buf head;
struct spinlock lock;
} freeblocks;

uint hash_blockno(uint blockno) {
return blockno % BUCKETS_SIZE;
}

类似 kalloc.c 中的 kalloc / kfree,我们可以写出在 freeblocks 中分配和释放 blocks 的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
struct buf* buf_alloc() {
struct buf *b;
acquire(&freeblocks.lock);
if (freeblocks.head.next == &freeblocks.head) {
release(&freeblocks.lock);
return 0;
}

b = freeblocks.head.next;
freeblocks.head.next = b->next;
b->next->prev = &freeblocks.head;
release(&freeblocks.lock);
return b;
}

void buf_free(struct buf *b) {
acquire(&freeblocks.lock);
b->next = freeblocks.head.next;
b->prev = &freeblocks.head;
b->next->prev = b;
b->prev->next = b;
release(&freeblocks.lock);
}

我们在 binit 函数中初始化哈希表和 freeblocks 链表,并把 bufs 数组的内存一个个装载到 freeblocks 上。这里我们按照实验指导的要求,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void
binit(void)
{
uint i;
memset(buckets, 0, sizeof(buckets));
for (i = 0; i < BUCKETS_SIZE; ++i) {
initlock(&buckets[i].lock, "bcache_buckets");
buckets[i].head.next = &buckets[i].head;
buckets[i].head.prev = &buckets[i].head;
}

memset(&freeblocks, 0, sizeof(freeblocks));
initlock(&freeblocks.lock, "bcache_freeblocks");
freeblocks.head.next = &freeblocks.head;
freeblocks.head.prev = &freeblocks.head;

memset(bufs, 0, sizeof(bufs));
for (i = 0; i < NBUF; ++i) {
initsleeplock(&bufs[i].lock, "bcache_buffer");
buf_free(&bufs[i]);
}
}

下面我们来修改 bget / brelse。

  • bget 先会检查 blockno 所在的桶,看目标块是不是被缓存了。如果没有被缓存,尝试分配一个新的缓存块并且把这块加入桶中。最后需要获取 buffer 的 sleeplock,这个 lock 是用来保护 data 字段的,由于 IO 的时间相对较长,所以这里用了一个 sleeplock 而不是 spinlock。
  • brelse 首先释放这个 sleeplock,接着减少 buffer 的引用计数。如果发现引用计数为 0 了,就把这个 buffer 归还给 freeblocks。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
static struct buf*
bget(uint dev, uint blockno)
{
struct buf *b;

uint hash = hash_blockno(blockno);
acquire(&buckets[hash].lock);

// Is the block already cached?
for (b = buckets[hash].head.next; b != &buckets[hash].head; b = b->next) {
if (b->dev == dev && b->blockno == blockno) {
b->refcnt++;
release(&buckets[hash].lock);
acquiresleep(&b->lock);
return b;
}
}

// Not cached.
// Grab a new buffer from the freeblocks list.
if (!(b = buf_alloc())) {
release(&buckets[hash].lock);
panic("bget: no buffers");
}

b->next = buckets[hash].head.next;
b->prev = &buckets[hash].head;
b->prev->next = b;
b->next->prev = b;
b->dev = dev;
b->blockno = blockno;
b->valid = 0;
b->refcnt = 1;
release(&buckets[hash].lock);
acquiresleep(&b->lock);
return b;
}

void
brelse(struct buf *b)
{
if(!holdingsleep(&b->lock))
panic("brelse");

releasesleep(&b->lock);

uint hash = hash_blockno(b->blockno);
acquire(&buckets[hash].lock);
b->refcnt--;
if (b->refcnt == 0) {
// No one is waiting for it.
// Move this buffer node away from the bucket,
// then free the buffer node
// (returning it back to the freeblocks list).
b->next->prev = b->prev;
b->prev->next = b->next;
buf_free(b);
}
release(&buckets[hash].lock);
}

最后,由于我们删掉了 bcache,还需要修改 bpin / bunpin,让它们按照 blockno 的哈希取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void
bpin(struct buf *b) {
uint hash = hash_blockno(b->blockno);
acquire(&buckets[hash].lock);
b->refcnt++;
release(&buckets[hash].lock);
}

void
bunpin(struct buf *b) {
uint hash = hash_blockno(b->blockno);
acquire(&buckets[hash].lock);
b->refcnt--;
release(&buckets[hash].lock);
}

实验结果