锁的实现

自旋锁(Spin lock)的实现

  • 什么是自旋锁
    自旋锁是一种忙等待的锁机制。锁的特性就是只有一个进程可以获取锁,在任何时间点都不能有超过一个锁的持有者。
    acquire里面有一个死循环,循环中判断锁对象的locked字段是否为0,如果为0那表明当前锁没有持有者,当前对于acquire的调用可以获取锁。之后我们通过设置锁对象的locked字段为1来获取锁。最后返回。

但两个进程可能同时读到 locked 字段为 0.

这就需要原子指令 test-and-set,在RISC-V上,这个特殊的指令就是amoswap(atomic memory swap)。这个指令接收3个参数,分别是address,寄存器r1,寄存器r2。这条指令会先锁定住address,将address中的数据保存在一个临时变量中(tmp),之后将r1中的数据写入到地址中,之后再将保存在临时变量中的数据写入到r2中,最后再对于地址解锁。

  • 代码实现
    1
    2
    3
    4
    5
    6
    7
    8
    // Mutual exclusion lock.
    struct spinlock {
    uint locked; // Is the lock held?

    // For debugging:
    char *name; // Name of lock.
    struct cpu *cpu; // The cpu holding the lock.
    };
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    void
    acquire(struct spinlock *lk)
    {
    push_off(); // disable interrupts to avoid deadlock.
    if(holding(lk))
    panic("acquire");

    // On RISC-V, sync_lock_test_and_set turns into an atomic swap:
    // a5 = 1
    // s1 = &lk->locked
    // amoswap.w.aq a5, a5, (s1)
    while(__sync_lock_test_and_set(&lk->locked, 1) != 0)
    ;

    // Tell the C compiler and the processor to not move loads or stores
    // past this point, to ensure that the critical section's memory
    // references happen strictly after the lock is acquired.
    // On RISC-V, this emits a fence instruction.
    __sync_synchronize();

    // Record info about lock acquisition for holding() and debugging.
    lk->cpu = mycpu();
    }
    由于关闭了中断,所以这一个进程还是在同一个CPU核上运行的。
    1
    2
    3
    4
    5
    6
    7
    int
    holding(struct spinlock *lk)
    {
    int r;
    r = (lk->locked && lk->cpu == mycpu());
    return r;
    }

    __sync_synchronize() 函数的作用是创建一个内存屏障(memory barrier)或者称为内存栅栏(memory fence)。内存屏障是一种同步机制,用于确保在屏障之前的所有内存访问操作都在屏障之前完成,并且在屏障之后的所有内存访问操作都在屏障之后执行。
    告诉编译器和处理器在该点之前和之后的内存操作不能被重排序或优化。这样可以确保在自旋锁的临界区代码执行之前,所有对临界区相关的内存访问都已经完成,而在临界区代码执行之后,所有对临界区相关的内存访问都已经生效。

睡眠锁

xv6中还有一种锁比较有趣,叫做睡眠锁。
首先我们得知道自旋锁的缺点有什么。

持有锁的进程不会主动出让CPU,其他进程要获取锁,长时间的自旋也会引起长时间的浪费。

这里有一种矛盾,当持有锁的时候让出CPU给其他线程(sched)是违法的,因为其他线程如果也要acquire获取这把锁,就会导致死锁。同时,这种做法同样也违反了当自旋锁被持有的时候中断必须关闭的要求。
所以,我们就要设计一种锁,它在被acquire等待的时候能够让出CPU,以及允许持有这种锁的时候让出和中断的锁。

错误的设计:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void V(struct semaphore *s)
{
acquire(&s->lock);
s->count += 1;
wakeup(s); // change this
release(&s->lock);
}

void P(struct semaphore *s)
{
while (s->count == 0)
sleep(s); // change this
acquire(&s->lock);
s->count -= 1;
release(&s->lock);
}

在这个实现中,存在一个问题,就是当一个线程发现count为0时,还没执行sleep函数时,另一个线程执行了V操作,count加了1,wakeup再把所有的chans的线程的statesleeping改为runable。接着P进程接着执行sleep函数,把自己的state改为sleeping
所以这就存在一个问题,P进程可能永远不会被唤醒,除非V进程再次调用了wakeup

改进

上面问题的根源就是P只在s->count==0时睡眠的不变量被正在运行的V给破坏了。
但是又不能在Pwhile循环前面申请锁,这样很明显会造成死锁。
正确实现:

修改sleep接口来修复上述方案:调用者必须传递一个条件锁给sleep,使得其可以在睡眠调用的进程并在睡眠通道上等待时释放锁。锁会强制并行的V等待直到P将它自己睡眠,因此wakeup会找到一个正在睡眠的消费者并唤醒它。一旦消费者被唤醒,sleep就需要在返回前再次获取锁。

1
2
3
4
5
6
7
8
9
// Long-term locks for processes
struct sleeplock {
uint locked; / Is the lock held?
struct spinlock lk; // spinlock protecting this sleep lock

// For debugging:
char *name; // Name of lock.
int pid; // Process holding lock
};

可以看到,sleeplock有两把锁。

在这之前,先简单了解一下sleep函数和wakeup函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void sleep(void *chan, struct spinlock *lk)
{
struct proc *p = myproc();
acquire(&p->lock); //DOC: sleeplock1
release(lk);

// Go to sleep.
p->chan = chan;
p->state = SLEEPING;

sched();

// Tidy up.
p->chan = 0;

// Reacquire original lock.
release(&p->lock);
acquire(lk);
}

// Wake up all processes sleeping on chan.
// Must be called without any p->lock.
void wakeup(void *chan)
{
struct proc *p;

for(p = proc; p < &proc[NPROC]; p++) {
if(p != myproc()){
acquire(&p->lock);
if(p->state == SLEEPING && p->chan == chan) {
p->state = RUNNABLE;
}
release(&p->lock);
}
}
}

在xv6的proc结构体中,有一个chan字段用于实现进程的等待通道的。当一个进程需要等待某一个事件时,需要把chan字段设置为相应的通道或条件。

下面看acquiresleepreleasesleep函数的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void acquiresleep(struct sleeplock *lk)
{
acquire(&lk->lk);
while (lk->locked) {
sleep(lk, &lk->lk);
}
lk->locked = 1;
lk->pid = myproc()->pid;
release(&lk->lk);
}

void releasesleep(struct sleeplock *lk)
{
acquire(&lk->lk);
lk->locked = 0;
lk->pid = 0;
wakeup(lk);
release(&lk->lk);
}

acquiresleep函数首先会申请lk->lk这把锁,这使得在执行while语句的时候不会有其他线程的wakeup操作。
在进入sleep函数的时候,就可以释放lk->lk这把锁,其他线程可能会尝试调用wakeup,但进入sleep的线程会事先aquire(&myproc->lock),wakeup也会尝试获取线程的锁,所以这样不会引起唤醒丢失。


锁的实现
http://example.com/2024/02/29/操作系统/xv6-labs/锁的实现/
作者
LiuZhaocheng
发布于
2024年2月29日
许可协议