0%

xv6-2020-lab8解析

Lab8 locks

在本实验中,您将获得重新设计代码以提高并行编程的经验。多核计算机上并行性差的常见原因是锁争用较高。改善并行性通常涉及更改数据结构和锁定策略,以减少争用。 您将为xv6内存分配器和块缓存执行此操作。

Memory allocator(中等)

程序user/kalloctest测试了xv6的内存分配器,三个进程收缩、扩展它们的地址空间,因为扩展和收缩地址空间本质上使用的是kallockfree,而整个内核其实只有一个空闲的内存链表,显然如果多个CPU需要分配内存必然会抢占这个链表导致冲突严重。

#fetch-and-add这个后面的数字越大,说明冲突越严重。

最最核心的问题就是, kalloc() 只有一个单空闲列表,由一个单一的锁进行保护。为了避免严重的锁竞争,你需要重新设计内存分配器和列表;比较简单的一个方法是,为每一个CPU核心都创建一个空闲列表并且每个列表都有自己的锁。一个挑战就是当一个CPU已经用完了空闲的列表,但是另外一个CPU仍然有空闲列表,这种情况下可以去另外一个CPU进行窃取,虽然窃取会引发锁竞争,但是这种情况非常少发生。

hints:

  • 可以使用 kernel/param.h中的常量 NCPU
  • 记得重写 freerange
  • 使用cpuid()可以获取当前CPU的编号,但是只有当中断关闭的前提下才可以。你可以使用 push_off()pop_off() 来进行相应的操作。
  • 看一眼 kernel/sprintf.c中的snprintf函数是很有效的。

数据结构

1
2
3
4
struct {
struct spinlock lock;
struct run *freelist;
} kmem[NCPU];

按照对应的推荐,每一个CPU一个内存分配列表。

修改kinit

1
2
3
4
5
6
7
8
9
void
kinit()
{
for(int i = 0;i<NCPU;i++){
initlock(&kmem[i].lock, "kmem");
}

freerange(end, (void*)PHYSTOP);
}

修改kfree

1
2
3
4
5
6
7
8
9
10
.......
r = (struct run*)pa;

push_off();
int id = cpuid();
acquire(&kmem[id].lock);
r->next = kmem[id].freelist;
kmem[id].freelist = r;
release(&kmem[id].lock);
pop_off();

其实只有获取cpuid的时候才需要push_off()pop_off(),上面有点把关中断的范围扩大了,不是很好。

修改kalloc 新增偷取功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void *
kalloc(void)
{
struct run *r;

push_off();
int id = cpuid();
pop_off();
acquire(&kmem[id].lock);
r = kmem[id].freelist;
if(r)
kmem[id].freelist = r->next;
else {
// 从别的cpu那里偷
for (int i = 0; i < NCPU; i++)
{
if (i == id){
// 自己的话直接跳过
continue;
}
acquire(&kmem[i].lock);
r = kmem[i].freelist;
if (r) {
kmem[i].freelist = r->next;
release(&kmem[i].lock);
break;
}
release(&kmem[i].lock);
}
}
release(&kmem[id].lock);

if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
return (void*)r;
}

偷取功能其实比想象的要简单很多,就是遍历各个CPU,然后尝试获取锁并且移动它们的空闲链表,如果成功就成功偷取到了,失败就只能算了。

Buffer cache(困难)

这一部分的实验和上面一部分完全没有关系。

如果多个进程同时访问文件系统,那么它们就会竞争 bcache.lock,这个锁是用来竞争所有的缓存的区域的。

减少bcache.lock的征用做起来可比上面的kalloc难多了,因为它们本来就是在多个进程中共享的;对于kalloc,可以通过为每个CPU来分配自己的分配器(分配自己的内存)有效避免竞争,但是这个方法对于buffer来说是没有用的。这个时候就可以用一个分桶的哈希表来实现。

下面这些冲突是被允许的:

  • 两个进程使用相同的block number
  • 两个进程没有命中cache,所以需要找到一个Block去替换。
  • 两个进程同时命中哈希表中的同一个地方,这也是允许的,只不过可以调整哈希表的大小来让这种情况尽可能少的发生。

hints:

  • 好好阅读xv6book中的8.1到8.3章节。
  • 你创建的哈希表可以不具有动态扩展功能,推荐使用素数(如13)来减少冲突的可能性。
  • 在哈希表中搜索缓冲区,找不到的时候需要分配一个entry,这个过程必须是原子性的。
  • 移除所有的buffer,使用上一次的时间戳。这样的话brelse就不需要bcache锁。
  • 当在高速缓存中未找到查询时,bget会选择要重新使用的缓冲区的部分
  • 在某些情况下,可能同时需要持有两个锁,确保避免死锁。
  • 当新的块与旧的块映射到同一个桶中的时候,在这种情况下需要确保不会发生死锁。
  • 调试技巧:可以通过设置CPU=1来进行调试。

总得来说,就是给硬盘的缓存分配来重新设计一个锁,而且也很明确lab就是要求实现一个哈希表。但是我个人是完全没有理解tick那部分在讲什么东西,要怎么实现,所以这部分我打算先跳过了。

然后就是每一个桶都一个锁的概念。这里用到是这样一种想法,最开始的时候,所有的缓存块其实和之前是一模一样的,也是统一在第一个桶里面。然后当其他的桶需要缓存块的时候,就从这个桶里拿(你可以理解为这些缓存块在之间进行移动)。难点就是,一个缓存块移动到另外一个桶里的时候,需要一些链表操作,是比较恶心的,其他都还好。

确定哈希桶的大小

1
#define NBUCKETS 13

就按照hint里面的,用13作为哈希表的大小就可以了。

修改bcache数据结构

原来的bcache只有一个锁,然后有一个双向的链表。修改之后的双向链表当然是需要保留的,但是锁肯定需要增加到和哈希桶一样多,所以应该是:

1
2
3
4
5
6
7
8
9
struct {
struct spinlock locks[NBUCKETS];
struct buf buf[NBUF];

// Linked list of all buffers, through prev/next.
// Sorted by how recently the buffer was used.
// head.next is most recent, head.prev is least.
struct buf head[NBUCKETS];
} bcache;

哈希算法

我们需要确定用哪个值来做哈希的key,以及哈希算法具体是什么。我这里用了最为简单的,直接利用取模计算来映射,而key就是使用的块号码。

1
2
3
4
5
int
hash(uint blockno)
{
return blockno % NBUCKETS;
}

binit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void
binit(void)
{
struct buf *b;

// 为每一个桶初始化锁
for (int i = 0; i < NBUCKETS; i++)
initlock(&bcache.locks[i], "bcache-bucket");

// Create linked list of buffers
// bcache.head.prev = &bcache.head;
// bcache.head.next = &bcache.head;

// 每一个桶子的head进行初始化
for (int i = 0; i < NBUCKETS; i++)
{
bcache.head[i].prev = &bcache.head[i];
bcache.head[i].next = &bcache.head[i];
}
for(b = bcache.buf; b < bcache.buf+NBUF; b++){
b->next = bcache.head[0].next;
b->prev = &bcache.head[0];
initsleeplock(&b->lock, "buffer");
bcache.head[0].next->prev = b;
bcache.head[0].next = b;
}
}

我上面是把所有的缓存块放到了第一个桶里,你可以放到任何一个桶里,也可以随便放,因为之后反正也是各个桶各种搬来搬去,并不影响。

bget

这个函数就是核心了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
static struct buf*
bget(uint dev, uint blockno)
{
struct buf *b;

// 获取对应的桶子
int hashid = hash(blockno);
acquire(&bcache.locks[hashid]);

// Is the block already cached?
for(b = bcache.head[hashid].next; b != &bcache.head[hashid]; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.locks[hashid]);
acquiresleep(&b->lock);
return b;
}
}

// Not cached.
// Recycle the least recently used (LRU) unused buffer.
// 遍历所有的桶找到可用的
int i = hashid;
while(1)
{
// 下一个桶子
i = (i + 1) % NBUCKETS;
// 如果就是当前自己这个桶,那么直接跳过
if (i == hashid)
continue;
// 获取锁
acquire(&bcache.locks[i]);

for (b = bcache.head[i].prev; b != &bcache.head[i]; b = b->prev)
{
if (b->refcnt == 0)
{
// 找到一个
b->dev = dev;
b->blockno = blockno;
b->valid = 0;
b->refcnt = 1;
// 断开连接
b->prev->next = b->next;
b->next->prev = b->prev;
// 释放对应的块的锁
release(&bcache.locks[i]);
// 连接到这个桶来
b->prev = &bcache.head[hashid];
b->next = bcache.head[hashid].next;
b->next->prev = b;
b->prev->next = b;
// 把这个桶的锁释放
release(&bcache.locks[hashid]);
acquiresleep(&b->lock);
return b;
}
}

release(&bcache.locks[i]);
}
panic("bget: no buffers");
}

其实难点是没有的,就是一些链表的操作,仔细一点应该没问题。

其它还有brelse、bpin和bunpin也有一些需要修改,不过都是非常简单的,所以这里就不贴出来了。

总结

这两个实验本质上,就是教你要用更加细粒度的锁,不要一把锁大家一起用,而是分成几个段,然后为每一个段都安排一个锁,这样就能够提升并发度了。