Lab8 locks 在本实验中,您将获得重新设计代码以提高并行编程的经验。多核计算机上并行性差的常见原因是锁争用较高。改善并行性通常涉及更改数据结构和锁定策略,以减少争用。 您将为xv6内存分配器和块缓存执行此操作。
Memory allocator(中等) 程序user/kalloctest测试了xv6的内存分配器,三个进程收缩、扩展它们的地址空间,因为扩展和收缩地址空间本质上使用的是kalloc和kfree,而整个内核其实只有一个空闲的内存链表,显然如果多个CPU需要分配内存必然会抢占这个链表导致冲突严重。
#fetch-and-add这个后面的数字越大,说明冲突越严重。
最最核心的问题就是, kalloc() 只有一个单空闲列表,由一个单一的锁进行保护。为了避免严重的锁竞争,你需要重新设计内存分配器和列表;比较简单的一个方法是,为每一个CPU核心都创建一个空闲列表并且每个列表都有自己的锁。一个挑战就是当一个CPU已经用完了空闲的列表,但是另外一个CPU仍然有空闲列表,这种情况下可以去另外一个CPU进行窃取,虽然窃取会引发锁竞争,但是这种情况非常少发生。
hints:
可以使用 kernel/param.h中的常量 NCPU
记得重写 freerange
使用cpuid()可以获取当前CPU的编号,但是只有当中断关闭的前提下才可以。你可以使用 push_off() 和 pop_off() 来进行相应的操作。
看一眼 kernel/sprintf.c中的snprintf函数是很有效的。
数据结构 1 2 3 4 struct { struct spinlock lock ; struct run *freelist ; } kmem[NCPU];
按照对应的推荐,每一个CPU一个内存分配列表。
修改kinit 1 2 3 4 5 6 7 8 9 void kinit() { for (int i = 0 ;i<NCPU;i++){ initlock(&kmem[i].lock, "kmem" ); } freerange(end, (void *)PHYSTOP); }
修改kfree 1 2 3 4 5 6 7 8 9 10 ....... r = (struct run*)pa; push_off(); int id = cpuid();acquire(&kmem[id].lock); r->next = kmem[id].freelist; kmem[id].freelist = r; release(&kmem[id].lock); pop_off();
其实只有获取cpuid的时候才需要push_off()和pop_off(),上面有点把关中断的范围扩大了,不是很好。
修改kalloc 新增偷取功能 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 void *kalloc(void ) { struct run *r ; push_off(); int id = cpuid(); pop_off(); acquire(&kmem[id].lock); r = kmem[id].freelist; if (r) kmem[id].freelist = r->next; else { for (int i = 0 ; i < NCPU; i++) { if (i == id){ continue ; } acquire(&kmem[i].lock); r = kmem[i].freelist; if (r) { kmem[i].freelist = r->next; release(&kmem[i].lock); break ; } release(&kmem[i].lock); } } release(&kmem[id].lock); if (r) memset ((char *)r, 5 , PGSIZE); return (void *)r; }
偷取功能其实比想象的要简单很多,就是遍历各个CPU,然后尝试获取锁并且移动它们的空闲链表,如果成功就成功偷取到了,失败就只能算了。
Buffer cache(困难) 这一部分的实验和上面一部分完全没有关系。
如果多个进程同时访问文件系统,那么它们就会竞争 bcache.lock,这个锁是用来竞争所有的缓存的区域的。
减少bcache.lock的征用做起来可比上面的kalloc难多了,因为它们本来就是在多个进程中共享的;对于kalloc,可以通过为每个CPU来分配自己的分配器(分配自己的内存)有效避免竞争,但是这个方法对于buffer来说是没有用的。这个时候就可以用一个分桶的哈希表来实现。
下面这些冲突是被允许 的:
两个进程使用相同的block number
两个进程没有命中cache,所以需要找到一个Block去替换。
两个进程同时命中哈希表中的同一个地方,这也是允许的,只不过可以调整哈希表的大小来让这种情况尽可能少的发生。
hints:
好好阅读xv6book中的8.1到8.3章节。
你创建的哈希表可以不具有动态扩展 功能,推荐使用素数(如13)来减少冲突的可能性。
在哈希表中搜索缓冲区,找不到的时候需要分配一个entry,这个过程必须是原子性的。
移除所有的buffer,使用上一次的时间戳。这样的话brelse就不需要bcache锁。
当在高速缓存中未找到查询时,bget会选择要重新使用的缓冲区的部分
在某些情况下,可能同时需要持有两个锁,确保避免死锁。
当新的块与旧的块映射到同一个桶中的时候,在这种情况下需要确保不会发生死锁。
调试技巧:可以通过设置CPU=1来进行调试。
总得来说,就是给硬盘的缓存分配来重新设计一个锁,而且也很明确lab就是要求实现一个哈希表。但是我个人是完全没有理解tick那部分在讲什么东西,要怎么实现,所以这部分我打算先跳过了。
然后就是每一个桶都一个锁的概念。这里用到是这样一种想法,最开始的时候,所有的缓存块其实和之前是一模一样的,也是统一在第一个桶里面。然后当其他的桶需要缓存块的时候,就从这个桶里拿(你可以理解为这些缓存块在之间进行移动)。难点就是,一个缓存块移动到另外一个桶里的时候,需要一些链表操作,是比较恶心的,其他都还好。
确定哈希桶的大小
就按照hint里面的,用13作为哈希表的大小就可以了。
修改bcache数据结构 原来的bcache只有一个锁,然后有一个双向的链表。修改之后的双向链表当然是需要保留的,但是锁肯定需要增加到和哈希桶一样多,所以应该是:
1 2 3 4 5 6 7 8 9 struct { struct spinlock locks [NBUCKETS ]; struct buf buf [NBUF ]; struct buf head [NBUCKETS ]; } bcache;
哈希算法 我们需要确定用哪个值来做哈希的key,以及哈希算法具体是什么。我这里用了最为简单的,直接利用取模计算来映射,而key就是使用的块号码。
1 2 3 4 5 int hash(uint blockno) { return blockno % NBUCKETS; }
binit 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 void binit(void ) { struct buf *b ; for (int i = 0 ; i < NBUCKETS; i++) initlock(&bcache.locks[i], "bcache-bucket" ); for (int i = 0 ; i < NBUCKETS; i++) { bcache.head[i].prev = &bcache.head[i]; bcache.head[i].next = &bcache.head[i]; } for (b = bcache.buf; b < bcache.buf+NBUF; b++){ b->next = bcache.head[0 ].next; b->prev = &bcache.head[0 ]; initsleeplock(&b->lock, "buffer" ); bcache.head[0 ].next->prev = b; bcache.head[0 ].next = b; } }
我上面是把所有的缓存块放到了第一个桶里,你可以放到任何一个桶里,也可以随便放,因为之后反正也是各个桶各种搬来搬去,并不影响。
bget 这个函数就是核心了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 static struct buf *bget (uint dev , uint blockno ){ struct buf *b ; int hashid = hash(blockno); acquire(&bcache.locks[hashid]); for (b = bcache.head[hashid].next; b != &bcache.head[hashid]; b = b->next){ if (b->dev == dev && b->blockno == blockno){ b->refcnt++; release(&bcache.locks[hashid]); acquiresleep(&b->lock); return b; } } int i = hashid; while (1 ) { i = (i + 1 ) % NBUCKETS; if (i == hashid) continue ; acquire(&bcache.locks[i]); for (b = bcache.head[i].prev; b != &bcache.head[i]; b = b->prev) { if (b->refcnt == 0 ) { b->dev = dev; b->blockno = blockno; b->valid = 0 ; b->refcnt = 1 ; b->prev->next = b->next; b->next->prev = b->prev; release(&bcache.locks[i]); b->prev = &bcache.head[hashid]; b->next = bcache.head[hashid].next; b->next->prev = b; b->prev->next = b; release(&bcache.locks[hashid]); acquiresleep(&b->lock); return b; } } release(&bcache.locks[i]); } panic("bget: no buffers" ); }
其实难点是没有的,就是一些链表的操作,仔细一点应该没问题。
其它还有brelse、bpin和bunpin也有一些需要修改,不过都是非常简单的,所以这里就不贴出来了。
总结 这两个实验本质上,就是教你要用更加细粒度的锁,不要一把锁大家一起用,而是分成几个段,然后为每一个段都安排一个锁,这样就能够提升并发度了。