MIT 6.S081/Fall 2019 Lab7 locks

2020-11-24
7 min read

MIT 6.S081/Fall 2019实验 Lab: locks,多核计算机上并行性差的一个常见症状是高锁争用。为了减少争用,提高并行性通常需要同时改变数据结构和锁定策略。在这个实验中,分为两部分内容:为xv6内存分配器和磁盘缓存buffer cache重新设计代码提高并行性。

本文对主要思路和关键部分的代码实现进行说明,完整代码实现可以参考我的GitHub仓库.

1. 内存分配器

1.1 基本原理

xv6对上层提供kallockfree接口来管理物理内存。通过kallockfree,屏蔽了对物理内存的管理,使得调用者只需要关心虚拟地址空间,在需要使用新内存空间的时候调用kalloc,在需要释放内存空间的时候调用kfree

在系统启动时,main()函数(见kernel/main.c)调用kinit()来初始化分配器。kinit通过保存所有空闲页来初始化链表。kinit调用freerange来把空闲内存加到链表里,freerange是把每个空闲页逐一加到链表里来实现此功能的。

1.2 链表操作

释放内存

释放内存的函数是kfree(char *v),首先将 char *v 开始的页物理内存初始化为1,这是为了让之前使用它的代码不能再读取到有效的内容,期望这些代码能尽早崩溃以发现问题所在。然后将这空闲页物理内存加到链表头。

申请内存

kalloc(void *)用来分配内存,移除并返回空闲链表头的第一个元素,即给调用者分配1页物理内存。

由于物理内存是在多进程之间共享的,所以不管是分配还是释放页面,每次操作kmem.freelist时都需要先申请kmem.lock,此后再进行内存页面的操作。

1.3 Lock

kalloc.c中调用acquire()release()来获取锁和释放锁,kalloc只在kalloc()kfree()中使用了锁,都是把对freelist的操作锁了起来。kfree()在向freelist里加节点前锁了一下,操作完之后解锁。kalloc()在移除freelist第一个元素时也加锁,操作完成再释放锁。所以对于内存分配器中需要锁保护的只有对freelist的操作。

1.4 具体步骤

  1. 将kalloc的共享freelist改为每个CPU独立的freelist;

    struct run {
      struct run *next;
    };
    
    struct kmem{
      struct spinlock lock;
      struct run *freelist;
    };
    
    struct kmem kmems[NCPU];
    
    void kinit()
    {
      char *lockname = "kmem 0";
      for (int i = 0; i < NCPU; i++) {
        lockname[5] = '0' + i;
        initlock(&kmems[i].lock, lockname);
      }
      freerange(end, (void*)PHYSTOP);
    }
    
  2. 获取内存块时,优先分配当前CPU的freelist中的内存块;

  3. 当前CPU没有空闲内存块,则从其他CPU的freelist中窃取内存块;

  4. 所有CPU都没有空闲块时,返回0;

    void *kalloc(void)
    {
      struct run *r;
      push_off();
      int cpu_num = cpuid();
      acquire(&kmems[cpu_num].lock);  // 加锁
      r = kmems[cpu_num].freelist;
      if (r) {  // 优先分配freelist里面的内存块
        kmems[cpu_num].freelist = r->next;
        release(&kmems[cpu_num].lock);
      } else {  // 从其他CPU的freelist进行窃取
        release(&kmems[cpu_num].lock);
        for (int i = 0; i < NCPU; i++) {
          acquire(&kmems[i].lock);
          r = kmems[i].freelist;
          if (r) { 
            kmems[i].freelist = r->next;
            release(&kmems[i].lock);
            break;
          }
          release(&kmems[i].lock);  // 解锁
        }
      }
      pop_off();
    
      if(r) memset((char*)r, 5, PGSIZE); // fill with junk
    
      return (void*)r;
    }
    
  5. 释放内存块时,将内存块放入当前CPU的freelist中;

    void kfree(void *pa)
    {
      struct run *r;
    
      if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
        panic("kfree");
    
      // Fill with junk to catch dangling refs.
      memset(pa, 1, PGSIZE);
    
      r = (struct run*)pa;
      push_off();
      int cpu_num = cpuid();
      acquire(&kmems[cpu_num].lock);  // 加锁
      r->next = kmems[cpu_num].freelist;	// 加入freelist
      kmems[cpu_num].freelist = r;
      release(&kmems[cpu_num].lock);  // 解锁
      pop_off();
    }
    
  6. 为2-5步的过程合理加锁,保证每个freelist的一致性;

2. 磁盘缓存

2.1 bcache

数据结构bcache(见kernel/bio.c)维护了一个由静态数组struct buf buf[NBUF]组成的双向链表,它以块为单位,每次读入或写出一个磁盘块,放到一个内存缓存块中(bcache.buf),同时自旋锁bcache.lock用于用户互斥访问。所有对缓存块的访问都是通过bcache.head引用链表来实现的,而不是buf数组。

struct {
  struct spinlock lock;
  struct buf buf[NBUF];

  // Linked list of all buffers, through prev/next.
  // head.next is most recently used.
  struct buf head;
} bcache;

每个缓存块都由三个部分组成,其中data字段标示了它的内容,指针字段(*prev,*next)用于组成链表,数值字段用于标示它的属性,如,字段valid的意思是缓存区包含了一个块的复制(即该buffer包含对应磁盘块的数据),字段disk的意思是缓存区的内容已经被提交到了磁盘,字段dev是设备号,字段blockno是缓存数据块号,字段refcnt是被引用次数,lock是睡眠锁。

struct buf {
  int valid;   // has data been read from disk?
  int disk;    // does disk "own" buf?
  uint dev;
  uint blockno;
  struct sleeplock lock;
  uint refcnt;
  struct buf *prev; // LRU cache list
  struct buf *next;
  uchar data[BSIZE];
};

bcache的操作如下:

  • 在系统启动时,main()函数(见kernel/main.c)调用binit()来初始化缓存,随即调用initlock()初始化bcache.lock,然后循环遍历buf数组,采用头插法逐个链接到bcache.head后面。
  • 上层文件系统读磁盘时,调用bread(),随即调用bget()检查请求的磁盘块是否在缓存中,如果命中,返回缓存命令结果。如果未命中,转到底层的virtio_disk_rw()函数先此磁盘块从磁盘加载进缓存中,再返回此磁盘块。
  • 上层文件写磁盘时,调用bwrite(),随即调用virtio_disk_rw()函数直接将缓存中的数据写入磁盘。
  • 上层文件系统可通过调用brelse()释放一块不再使用的缓存块。

2.2 锁

bio.c中,一共使用两种类型的锁:自旋锁spinlockbcache.lock)和睡眠锁sleeplockb.lock)。

自旋锁

bcache.lock用于表示当前访问的bcache缓存块数据结构的是否被锁住。 当bcache.lock为0时表示为锁住,能够访问当前数据结构bcache,如果为1,即暂时无法获得锁,则不断循环、自旋、等待锁重新可用。

bget()在操作bcache数据结构(修改refcntdevblocknovalid)时,需要获取到自旋锁bcache.lock,操作完成后再释放该锁。

brelse()需要获取到自旋锁bcache.lock,才能将refcnt(引用计数)减1,且只有在refcnt为0时,将该数据缓存块插入到bcache.head链表后面,操作完成后再释放该锁。

bpin()bunpin()获取到锁后,才能修改refcnt,操作完成后再释放该锁。

睡眠锁

b.lock用于表示bcache缓存块数据结构中的当前缓存数据块buf是否被锁住,当b.lock为1时,则调用sleep()睡眠等待锁重新可用,为0则表示锁已经被释放。睡眠锁的三个接口函数如下:

  • acquiresleep():查询b.lock是否被锁,如果被锁了,就睡眠,让出CPU,直到wakeup()唤醒后,获取到锁,并将b.lock置1。
  • releasesleep():释放锁,并调用wakeup()
  • holdingsleep():返回锁的状态(1:锁住或0:未锁)

bget()在获取到缓存块(命中的缓存块,或者,未命中时通过LRU算法替换出来缓存中的缓存块)后,调用acquiresleep()获取睡眠锁。

bwrite()在写入到磁盘之前,先调用holdingsleep()查询是否已经获取到该睡眠锁,确保有带锁而入。

brelse()也先调用holdingsleep()查询是否已经获取到该睡眠锁,确保有带锁后,才调用releasesleep()释放该锁。

2.3 具体步骤

  1. 构建bcache数据结构哈希表;

    #define HashSize 17
    struct {
      struct spinlock lock[HashSize];
      struct buf buf[NBUF];
    
      // Linked list of all buffers, through prev/next.
      // head.next is most recently used.
      // struct buf head;
      struct buf hashbucket[HashSize];
    } bcache;
    
    int hashconflict(int number) {
      return (number + 1) % HashSize;
    }
    
    void binit(void) {
      struct buf *b;
      // initial every lock
      for (int i = 0; i < HashSize; i++) {
        char *lockname = "bcache 00";
        initlock(&bcache.lock[i], lockname);
        // Create linked list of buffers
        bcache.hashbucket[i].prev = &bcache.hashbucket[i];
        bcache.hashbucket[i].next = &bcache.hashbucket[i];
      }
      // use bcache.hashbucket[0] to store temporally, made a mistake there (put it in the loop above)
      for (b = bcache.buf; b < bcache.buf+NBUF; b++) {
        b->next = bcache.hashbucket[0].next;
        b->prev = &bcache.hashbucket[0];
        initsleeplock(&b->lock, "buffer");
        bcache.hashbucket[0].next->prev = b;
        bcache.hashbucket[0].next = b;
      }
    }
    
  2. 修改bget()brelse()使得查找和释放缓存中的不同块时,锁之间的冲突更少;

    • 首先在硬件层面寻找空闲的缓存块,如果没有则分配一个新的缓存块;
    static struct buf* bget(uint dev, uint blockno)
    {
      // get the hash index of blockno
      int locknumber = blockno % HashSize;
      struct buf *b;
      acquire(&bcache.lock[locknumber]);
    
      // Is the block already cached?
      for(b = bcache.hashbucket[locknumber].next; b != &bcache.hashbucket[locknumber]; b = b->next){
        if(b->dev == dev && b->blockno == blockno){
          b->refcnt++;
          release(&bcache.lock[locknumber]);
          acquiresleep(&b->lock);
          return b;
        }
      }
      int hashnumber = hashconflict(locknumber);
      // Not cached; recycle an unused buffer.
      while (hashnumber != locknumber) {
        acquire(&bcache.lock[hashnumber]);
        for(b = bcache.hashbucket[hashnumber].prev; b != &bcache.hashbucket[hashnumber]; b = b->prev){
          if(b->refcnt == 0) {
            b->dev = dev;
            b->blockno = blockno;
            b->valid = 0;
            b->refcnt = 1;
            // take out the buf
            b->next->prev = b->prev;
            b->prev->next = b->next;
    
            release(&bcache.lock[hashnumber]);
            // put the buf in locknumber bcache
            b->next = bcache.hashbucket[locknumber].next;
            b->prev = &bcache.hashbucket[locknumber];
            bcache.hashbucket[locknumber].next->prev = b;
            bcache.hashbucket[locknumber].next = b;
    
            release(&bcache.lock[locknumber]);
            acquiresleep(&b->lock);
            return b;
          }
        }
        release(&bcache.lock[hashnumber]);
        hashnumber = hashconflict(hashnumber);
      }
      panic("bget: no buffers");
    }
    
    • 释放一个已经锁住的缓存块,将其移到MRU list的头部

      void brelse(struct buf *b)
      {
        uint blockno = b->blockno;
        int locknumber = blockno % HashSize;
        if(!holdingsleep(&b->lock))
          panic("brelse");
      
        releasesleep(&b->lock);
      
        acquire(&bcache.lock[locknumber]);
        b->refcnt--;
        if (b->refcnt == 0) {
          // no one is waiting for it.
          b->next->prev = b->prev;
          b->prev->next = b->next;
          b->next = bcache.hashbucket[locknumber].next;
          b->prev = &bcache.hashbucket[locknumber];
          bcache.hashbucket[locknumber].next->prev = b;
          bcache.hashbucket[locknumber].next = b;
        }
        release(&bcache.lock[locknumber]);
      }
      
  3. 修改bpin()bunpin()增加或减少refcnt之前获取对应数据缓存块的锁,操作完成之后再释放锁。

    void bpin(struct buf *b) {
      uint blockno = b->blockno % HashSize;
      acquire(&bcache.lock[blockno]);
      b->refcnt++;
      release(&bcache.lock[blockno]);
    }
    
    void bunpin(struct buf *b) {
      uint blockno = b->blockno % HashSize;
      acquire(&bcache.lock[blockno]);
      b->refcnt--;
      release(&bcache.lock[blockno]);
    }