MIT 6.S081/Fall 2019实验 Lab: locks,多核计算机上并行性差的一个常见症状是高锁争用。为了减少争用,提高并行性通常需要同时改变数据结构和锁定策略。在这个实验中,分为两部分内容:为xv6内存分配器和磁盘缓存buffer cache重新设计代码提高并行性。
本文对主要思路和关键部分的代码实现进行说明,完整代码实现可以参考我的GitHub仓库.
1. 内存分配器
1.1 基本原理
xv6对上层提供kalloc
和kfree
接口来管理物理内存。通过kalloc
和kfree
,屏蔽了对物理内存的管理,使得调用者只需要关心虚拟地址空间,在需要使用新内存空间的时候调用kalloc
,在需要释放内存空间的时候调用kfree
。
在系统启动时,main()
函数(见kernel/main.c
)调用kinit()
来初始化分配器。kinit
通过保存所有空闲页来初始化链表。kinit
调用freerange
来把空闲内存加到链表里,freerange
是把每个空闲页逐一加到链表里来实现此功能的。
1.2 链表操作
释放内存
释放内存的函数是kfree(char *v)
,首先将 char *v
开始的页物理内存初始化为1,这是为了让之前使用它的代码不能再读取到有效的内容,期望这些代码能尽早崩溃以发现问题所在。然后将这空闲页物理内存加到链表头。
申请内存
kalloc(void *)
用来分配内存,移除并返回空闲链表头的第一个元素,即给调用者分配1页物理内存。
由于物理内存是在多进程之间共享的,所以不管是分配还是释放页面,每次操作kmem.freelist
时都需要先申请kmem.lock
,此后再进行内存页面的操作。
1.3 Lock
kalloc.c
中调用acquire()
和release()
来获取锁和释放锁,kalloc只在kalloc()
和kfree()
中使用了锁,都是把对freelist的操作锁了起来。kfree()
在向freelist里加节点前锁了一下,操作完之后解锁。kalloc()
在移除freelist第一个元素时也加锁,操作完成再释放锁。所以对于内存分配器中需要锁保护的只有对freelist的操作。
1.4 具体步骤
-
将kalloc的共享freelist改为每个CPU独立的freelist;
struct run { struct run *next; }; struct kmem{ struct spinlock lock; struct run *freelist; }; struct kmem kmems[NCPU]; void kinit() { char *lockname = "kmem 0"; for (int i = 0; i < NCPU; i++) { lockname[5] = '0' + i; initlock(&kmems[i].lock, lockname); } freerange(end, (void*)PHYSTOP); }
-
获取内存块时,优先分配当前CPU的freelist中的内存块;
-
当前CPU没有空闲内存块,则从其他CPU的freelist中窃取内存块;
-
所有CPU都没有空闲块时,返回0;
void *kalloc(void) { struct run *r; push_off(); int cpu_num = cpuid(); acquire(&kmems[cpu_num].lock); // 加锁 r = kmems[cpu_num].freelist; if (r) { // 优先分配freelist里面的内存块 kmems[cpu_num].freelist = r->next; release(&kmems[cpu_num].lock); } else { // 从其他CPU的freelist进行窃取 release(&kmems[cpu_num].lock); for (int i = 0; i < NCPU; i++) { acquire(&kmems[i].lock); r = kmems[i].freelist; if (r) { kmems[i].freelist = r->next; release(&kmems[i].lock); break; } release(&kmems[i].lock); // 解锁 } } pop_off(); if(r) memset((char*)r, 5, PGSIZE); // fill with junk return (void*)r; }
-
释放内存块时,将内存块放入当前CPU的freelist中;
void kfree(void *pa) { struct run *r; if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP) panic("kfree"); // Fill with junk to catch dangling refs. memset(pa, 1, PGSIZE); r = (struct run*)pa; push_off(); int cpu_num = cpuid(); acquire(&kmems[cpu_num].lock); // 加锁 r->next = kmems[cpu_num].freelist; // 加入freelist kmems[cpu_num].freelist = r; release(&kmems[cpu_num].lock); // 解锁 pop_off(); }
-
为2-5步的过程合理加锁,保证每个freelist的一致性;
2. 磁盘缓存
2.1 bcache
数据结构bcache
(见kernel/bio.c
)维护了一个由静态数组struct buf buf[NBUF]
组成的双向链表,它以块为单位,每次读入或写出一个磁盘块,放到一个内存缓存块中(bcache.buf
),同时自旋锁bcache.lock
用于用户互斥访问。所有对缓存块的访问都是通过bcache.head
引用链表来实现的,而不是buf
数组。
struct {
struct spinlock lock;
struct buf buf[NBUF];
// Linked list of all buffers, through prev/next.
// head.next is most recently used.
struct buf head;
} bcache;
每个缓存块都由三个部分组成,其中data字段标示了它的内容,指针字段(*prev,*next)用于组成链表,数值字段用于标示它的属性,如,字段valid
的意思是缓存区包含了一个块的复制(即该buffer包含对应磁盘块的数据),字段disk
的意思是缓存区的内容已经被提交到了磁盘,字段dev
是设备号,字段blockno
是缓存数据块号,字段refcnt
是被引用次数,lock
是睡眠锁。
struct buf {
int valid; // has data been read from disk?
int disk; // does disk "own" buf?
uint dev;
uint blockno;
struct sleeplock lock;
uint refcnt;
struct buf *prev; // LRU cache list
struct buf *next;
uchar data[BSIZE];
};
对bcache
的操作如下:
- 在系统启动时,
main()
函数(见kernel/main.c
)调用binit()
来初始化缓存,随即调用initlock()
初始化bcache.lock
,然后循环遍历buf
数组,采用头插法逐个链接到bcache.head
后面。 - 上层文件系统读磁盘时,调用
bread()
,随即调用bget()
检查请求的磁盘块是否在缓存中,如果命中,返回缓存命令结果。如果未命中,转到底层的virtio_disk_rw()
函数先此磁盘块从磁盘加载进缓存中,再返回此磁盘块。 - 上层文件写磁盘时,调用
bwrite()
,随即调用virtio_disk_rw()
函数直接将缓存中的数据写入磁盘。 - 上层文件系统可通过调用
brelse()
释放一块不再使用的缓存块。
2.2 锁
在bio.c
中,一共使用两种类型的锁:自旋锁spinlock
(bcache.lock
)和睡眠锁sleeplock
(b.lock
)。
自旋锁
bcache.lock
用于表示当前访问的bcache缓存块数据结构的是否被锁住。 当bcache.lock
为0时表示为锁住,能够访问当前数据结构bcache,如果为1,即暂时无法获得锁,则不断循环、自旋、等待锁重新可用。
bget()
在操作bcache数据结构(修改refcnt
、dev
、blockno
、valid
)时,需要获取到自旋锁bcache.lock
,操作完成后再释放该锁。
brelse()
需要获取到自旋锁bcache.lock
,才能将refcnt
(引用计数)减1,且只有在refcnt
为0时,将该数据缓存块插入到bcache.head
链表后面,操作完成后再释放该锁。
bpin()
和bunpin()
获取到锁后,才能修改refcnt
,操作完成后再释放该锁。
睡眠锁
b.lock
用于表示bcache缓存块数据结构中的当前缓存数据块buf是否被锁住,当b.lock
为1时,则调用sleep()
睡眠等待锁重新可用,为0则表示锁已经被释放。睡眠锁的三个接口函数如下:
acquiresleep()
:查询b.lock
是否被锁,如果被锁了,就睡眠,让出CPU,直到wakeup()
唤醒后,获取到锁,并将b.lock
置1。releasesleep()
:释放锁,并调用wakeup()holdingsleep()
:返回锁的状态(1:锁住或0:未锁)
bget()
在获取到缓存块(命中的缓存块,或者,未命中时通过LRU算法替换出来缓存中的缓存块)后,调用acquiresleep()
获取睡眠锁。
bwrite()
在写入到磁盘之前,先调用holdingsleep()
查询是否已经获取到该睡眠锁,确保有带锁而入。
brelse()
也先调用holdingsleep()
查询是否已经获取到该睡眠锁,确保有带锁后,才调用releasesleep()
释放该锁。
2.3 具体步骤
-
构建
bcache
数据结构哈希表;#define HashSize 17 struct { struct spinlock lock[HashSize]; struct buf buf[NBUF]; // Linked list of all buffers, through prev/next. // head.next is most recently used. // struct buf head; struct buf hashbucket[HashSize]; } bcache; int hashconflict(int number) { return (number + 1) % HashSize; } void binit(void) { struct buf *b; // initial every lock for (int i = 0; i < HashSize; i++) { char *lockname = "bcache 00"; initlock(&bcache.lock[i], lockname); // Create linked list of buffers bcache.hashbucket[i].prev = &bcache.hashbucket[i]; bcache.hashbucket[i].next = &bcache.hashbucket[i]; } // use bcache.hashbucket[0] to store temporally, made a mistake there (put it in the loop above) for (b = bcache.buf; b < bcache.buf+NBUF; b++) { b->next = bcache.hashbucket[0].next; b->prev = &bcache.hashbucket[0]; initsleeplock(&b->lock, "buffer"); bcache.hashbucket[0].next->prev = b; bcache.hashbucket[0].next = b; } }
-
修改
bget()
和brelse()
使得查找和释放缓存中的不同块时,锁之间的冲突更少;- 首先在硬件层面寻找空闲的缓存块,如果没有则分配一个新的缓存块;
static struct buf* bget(uint dev, uint blockno) { // get the hash index of blockno int locknumber = blockno % HashSize; struct buf *b; acquire(&bcache.lock[locknumber]); // Is the block already cached? for(b = bcache.hashbucket[locknumber].next; b != &bcache.hashbucket[locknumber]; b = b->next){ if(b->dev == dev && b->blockno == blockno){ b->refcnt++; release(&bcache.lock[locknumber]); acquiresleep(&b->lock); return b; } } int hashnumber = hashconflict(locknumber); // Not cached; recycle an unused buffer. while (hashnumber != locknumber) { acquire(&bcache.lock[hashnumber]); for(b = bcache.hashbucket[hashnumber].prev; b != &bcache.hashbucket[hashnumber]; b = b->prev){ if(b->refcnt == 0) { b->dev = dev; b->blockno = blockno; b->valid = 0; b->refcnt = 1; // take out the buf b->next->prev = b->prev; b->prev->next = b->next; release(&bcache.lock[hashnumber]); // put the buf in locknumber bcache b->next = bcache.hashbucket[locknumber].next; b->prev = &bcache.hashbucket[locknumber]; bcache.hashbucket[locknumber].next->prev = b; bcache.hashbucket[locknumber].next = b; release(&bcache.lock[locknumber]); acquiresleep(&b->lock); return b; } } release(&bcache.lock[hashnumber]); hashnumber = hashconflict(hashnumber); } panic("bget: no buffers"); }
-
释放一个已经锁住的缓存块,将其移到MRU list的头部
void brelse(struct buf *b) { uint blockno = b->blockno; int locknumber = blockno % HashSize; if(!holdingsleep(&b->lock)) panic("brelse"); releasesleep(&b->lock); acquire(&bcache.lock[locknumber]); b->refcnt--; if (b->refcnt == 0) { // no one is waiting for it. b->next->prev = b->prev; b->prev->next = b->next; b->next = bcache.hashbucket[locknumber].next; b->prev = &bcache.hashbucket[locknumber]; bcache.hashbucket[locknumber].next->prev = b; bcache.hashbucket[locknumber].next = b; } release(&bcache.lock[locknumber]); }
-
修改
bpin()
和bunpin()
增加或减少refcnt
之前获取对应数据缓存块的锁,操作完成之后再释放锁。void bpin(struct buf *b) { uint blockno = b->blockno % HashSize; acquire(&bcache.lock[blockno]); b->refcnt++; release(&bcache.lock[blockno]); } void bunpin(struct buf *b) { uint blockno = b->blockno % HashSize; acquire(&bcache.lock[blockno]); b->refcnt--; release(&bcache.lock[blockno]); }