Scheduling

Code: Context switching

xv6中,每个CPU中的scheduler都有一个专用的线程。这里线程的概念是,有自己的栈,有自己的寄存器状态。

当发生时钟中断时,当前进程调用yield,yield再通过swtch切换到scheduler线程。scheduler线程会通过swtch跳转到另外一个进程去执行。当另外一个进程发生时钟中断,又会通过yield回到scheduler,scheduler再调度原进程继续执行,如此周而复始。

Linux的调度原理也差不多类似这样。每个CPU都有一个调度类为SCHED_CLASS_IDLE的IDLE进程,IDLE进程体大概就是间歇不断地执行__schedule()函数,CPU空闲时就会不断执行IDLE线程。

而当有新的任务产生时(或任务被唤醒。可以从此看出task new和task wakeup的共通点,可以联想到竞赛中对该消息的处理方法),它首先通过调度类对应的select_cpu选择一个合适的(可以被抢占&&在该task对应的cpumask中)的cpu,迁移到cpu对应的rq;目标cpu通过IDLE进程体或者中断返回时检查到了NEED_SCHEDULE标记位,从而调用schedule函数pick新任务,然后进行context_switch切换到目标线程。如此周而复始。

image-20230118221757367

下面就来讲讲这个所谓的“线程”以及xv6的上下文切换是怎么实现的。

context

上下文切换的操作对象是上下文,因而首先了解一下上下文的结构。各种寄存器的状态即是上下文context。xv6中的context定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
struct context {
uint64 ra;
uint64 sp;

// callee-saved
uint64 s0;
uint64 s1;
uint64 s2;
uint64 s3;
uint64 s4;
uint64 s5;
uint64 s6;
uint64 s7;
uint64 s8;
uint64 s9;
uint64 s10;
uint64 s11;
};

上下文切换需要修改栈和pc,context中确实有sp寄存器,但是没有pc寄存器,这主要还是因为当swtch返回时,会回到ra所指向的地方,所以仅保存ra就足够了。

swtch

上下文的切换是通过swtch实现的。

1
void            swtch(struct context*, struct context*);

swtch会把当前进程的上下文保存在第一个context中,再切换到第二个context保存的上下文,具体实现就是写读保存寄存器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# in kernel/swtch.S
# a0和a1分别保存着两个参数的值,也即第一个context的地址和第二个context的地址
.globl swtch
swtch:
sd ra, 0(a0)
sd sp, 8(a0)
sd s0, 16(a0)
sd s1, 24(a0)
# ...
sd s11, 104(a0)

ld ra, 0(a1)
ld sp, 8(a1)
ld s0, 16(a1)
ld s1, 24(a1)
# ...
ld 11, 104(a1)

ret

sched

在sleep、yield和wakeup中,都会通过sched中的swtch进入scheduler线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void
sched(void)
{
int intena;
struct proc *p = myproc();

if(!holding(&p->lock))
panic("sched p->lock");
if(mycpu()->noff != 1)
panic("sched locks");
if(p->state == RUNNING)
panic("sched running");
if(intr_get()) // 当持有锁时一定为关中断状态
panic("sched interruptible");

intena = mycpu()->intena;
swtch(&p->context, &mycpu()->context);
mycpu()->intena = intena;
}

cpu中存储着的是scheduler线程的context。因而,这样就可以保存当前进程的context,读取scheduler线程的context,然后转换到scheduler的context执行了。

可以发现这里是有个很完美的组合技的。由sched()保存context到process结构体中,再由scheduler()读取process对应的context回归到sched()继续执行,我感觉调度设计这点真是帅的一匹。

scheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void
scheduler(void)
{
struct proc *p;
struct cpu *c = mycpu();

c->proc = 0;
for(;;){
// Avoid deadlock by ensuring that devices can interrupt.
intr_on();

int nproc = 0;
for(p = proc; p < &proc[NPROC]; p++) {
acquire(&p->lock);
if(p->state != UNUSED) {
nproc++;
}
if(p->state == RUNNABLE) {
// Switch to chosen process. It is the process's job
// to release its lock and then reacquire it
// before jumping back to us.
p->state = RUNNING;
c->proc = p;
swtch(&c->context, &p->context);

// Process is done running for now.
// It should have changed its p->state before coming back.
c->proc = 0;
}
release(&p->lock);
}
if(nproc <= 2) { // only init and sh exist
intr_on();
asm volatile("wfi");
}
}
}

通过swtch进入scheduler线程后,会继续执行scheduler中swtch的下一个指令,完成下一次调度。

一些补充

以上是书本的介绍内容。看到这想必会有很多疑惑,至少有以下两点:

  1. 为什么cpu->context会存储着scheduler的上下文?这是什么时候,又是怎么初始化的?
  2. 为什么从sched中swtch会来到scheduler中swtch的下一句?

先从第一点入手。实际上,这个初始化的工作,是在操作系统启动时的main.c中完成的。

1
2
3
4
5
6
7
8
9
10
11
void
main()
{
if(cpuid() == 0){
// ...
} else {
// ...
}

scheduler();
}

在这之前,创建了第一个进程proc。在这里,每个cpu都调用了scheduler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void
scheduler(void)
{
struct proc *p;
struct cpu *c = mycpu();

c->proc = 0;
for(;;){
intr_on();

int nproc = 0;
for(p = proc; p < &proc[NPROC]; p++) {
acquire(&p->lock);
// ...
if(p->state == RUNNABLE) {
p->state = RUNNING;
c->proc = p;
swtch(&c->context, &p->context);

c->proc = 0;
}
release(&p->lock);
}
// ...
}
}

每个cpu都在scheduler线程的swtch(&c->context, &p->context);中,将当前的context,也即scheduler的context存入了mycpu()->context。随后,CPU中的某个去执行下一个进程,其他的就在scheduler线程的无限循环中等待,直到有别的进程产生。

去执行进程的CPU通过swtch切换上下文,切到了另一个进程中,此时在swtch中保存的ra是scheduler线程的swtch的下一句(因为scheduler->swtch也是个函数调用的过程)。会切到另一个进程的sched的下一句【因为它正是从那边swtch过来的】,或者是那个进程开始执行的地方【下面会提到是forkret】。另一个进程通过sched切换回来的时候,就正会切到ra所指向的位置,也即切到scheduler中的swtch后面。

这样一来,两个问题就都得到了解答。

从这,我们也能知道xv6是如何让CPU运转的:scheduler线程是CPU的IDLE状态。无事的时候在scheduler中等待,并且一直监测是否有进程需要执行。有的话,则去执行该进程;该进程又会通过sched切换回scheduler线程,继续等待。这样一来,就完成了进程管理的基本的自动机图像。

Code: Scheduling

sched前要做的事

A process that wants to give up the CPU must do three things:

  1. acquire its own process lock p->lock, release any other locks it is holding
  2. update its own state (p->state)
  3. call sched

yield (kernel/proc.c:515) follows this convention, as do sleep and exit.

sched double-checks those conditions (kernel/proc.c:499-504) and then an implication of those conditions: since a lock is held, interrupts should be disabled.

sched与scheduler

在上面的描述我们可以看到,schedscheduler联系非常密切,他们俩通过swtch相互切来切去,并且一直都只在这几行切来切去:

1
2
3
4
5
6
// in scheduler()
swtch(&c->context, &p->context);
c->proc = 0;
// in sched()
swtch(&p->context, &mycpu()->context);
mycpu()->intena = intena;

在两个线程之间进行这种样式化切换的过程有时被称为协程(coroutines)。

存在一种情况使得调度程序对swtch的调用没有以sched结束。一个新进程第一次被调度时,它从forkretkernel/proc.c:527)开始。Forkret是为了释放p->lock而包装的,要不然,新进程可以从usertrapret开始。

p->lock保证了并发安全性

考虑调度代码结构的一种方法是,它为每个进程强制维持一个不变性条件的集合,并在这些不变性条件不成立时持有p->lock

其中一个不变性条件是:如果进程是RUNNING状态,计时器中断的yield必须能够安全地从进程中切换出去;这意味着CPU寄存器必须保存进程的寄存器值(即swtch没有将它们移动到context中),并且c->proc必须指向进程。另一个不变性条件是:如果进程是RUNNABLE状态,空闲CPU的调度程序必须安全地运行它;这意味着p->context必须保存进程的寄存器(即,它们实际上不在实际寄存器中),没有CPU在进程的内核栈上执行,并且没有CPU的c->proc引用进程。

维护上述不变性条件是xv6经常在一个线程中获取p->lock并在另一个线程中释放它的原因,在保持p->lock时,这些属性通常不成立。

例如在yield中获取并在scheduler中释放。一旦yield开始修改一个RUNNING进程的状态为RUNNABLE,锁必须保持被持有状态,直到不变量恢复:最早的正确释放点是scheduler(在其自身栈上运行)清除c->proc之后。类似地,一旦scheduler开始将RUNNABLE进程转换为RUNNING,在内核线程完全运行之前(在swtch之后,例如在yield中)绝不能释放锁。

p->lock还保护其他东西:exitwait之间的相互作用,避免丢失wakeup的机制(参见第7.5节),以及避免一个进程退出和其他进程读写其状态之间的争用(例如,exit系统调用查看p->pid并设置p->killed(kernel/proc.c:611))。为了清晰起见,也许为了性能起见,有必要考虑一下p->lock的不同功能是否可以拆分。

p->lock在每次scheduler开始的时候获取,swtch到p进程的时候在yield等调用完sched的地方释放。而调用yield时获取的锁,又会在scheduler中释放。

1
2
3
4
5
6
7
8
9
10
// Give up the CPU for one scheduling round.
void
yield(void)
{
struct proc *p = myproc();
acquire(&p->lock);// 该锁会在scheduler中释放
p->state = RUNNABLE;
sched();
release(&p->lock);// 该锁释放的是scheduler中得到的锁
}
1
2
3
4
5
6
// in kernel/proc.c scheduler()
acquire(&p->lock);// 该锁会在yield等地被释放
// ...
swtch(&c->context, &p->context);
// ...
release(&p->lock);// 该锁会释放yield等地中获得的锁

不得不说,这结构实在是太精妙了。这中间的如此多的复杂过程,就这样成功地被锁保护了起来。

Code: mycpu and myproc

1
2
3
4
5
6
7
// Per-CPU state.
struct cpu {
struct proc *proc; // The process running on this cpu, or null.
struct context context; // swtch() here to enter scheduler().
int noff; // Depth of push_off() nesting.
int intena; // Were interrupts enabled before push_off()?
};

mycpu是通过获取当前cpuid来获取cpu结构的。当前使用的cpuid约定俗成地存在了tp寄存器里。为了让mycpu有效工作,必须确保tp寄存器始终存放的是当前cpu的hartid。

首先是在操作系统初始化的时候要把cpuid存入tp寄存器。RISC-V规定,mhartid也即cpuid的存放点只能在machine mode被读取。因而这项工作得在start.c中完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
// in kernel/start.c 
// keep each CPU's hartid in its tp register, for cpuid().
int id = r_mhartid();
w_tp(id);
// in kernel/riscv.h
// which hart (core) is this?
static inline uint64
r_mhartid()
{
uint64 x;
asm volatile("csrr %0, mhartid" : "=r" (x) );
return x;
}

在内核态中,编译器被设置为保证不会以其他方式使用tp寄存器。因而初始化之后,内核态中每个CPU的tp寄存器就始终存放着自己的cpuid。

但这在用户进程是不成立的。因而必须在用户进程进入陷阱的时候做一些工作。

1
2
3
4
# in kernel/trampoline.S uservec
sd tp, 64(a0)
# make tp hold the current hartid, from p->trapframe->kernel_hartid
ld tp, 32(a0)
1
2
3
4
5
struct trapframe {
/* 32 */ uint64 kernel_hartid; // saved kernel tp
/* 64 */ uint64 tp;
// ...
}

必须在trampoline保存用户态中使用的tp值,以及内核态中对应的hartid。

最后再在返回用户态的时候恢复用户态的tp值以及更新trampoline的tp值。

1
2
// in kernel/trap.c usertrapret()
p->trapframe->kernel_hartid = r_tp(); // hartid for cpuid()
1
2
# in trampoline.S userret
ld tp, 64(a0)

注意,更新trampoline的tp值这一步很重要。因为如果在用户态发生的是时钟中断,就会引起yield,可能造成CPU的切换。这时候就需要在返回用户态的时候修改一下trapframe中的tp为当前CPU的tp。这样一来才能保证,在本次时钟中断结束,以及到下一次时钟中断修改CPU这一期间,trapframe中的tp寄存器以及内核态中的tp寄存器都是正确的。

通过mycpu()获取cpuid其实是非常脆弱的。因为你可能获取完cpuid,进程就被切到别的CPU去执行了,这就会有一个先检查后执行的竞态条件,具有并发安全隐患。因而,xv6要求使用mycpu()返回值的这段代码需要关中断,这样就可以避免时钟中断造成的进程切换了。比如说像myproc()这样:

1
2
3
4
5
6
7
8
9
// Return the current struct proc *, or zero if none.
struct proc*
myproc(void) {
push_off();
struct cpu *c = mycpu();
struct proc *p = c->proc;
pop_off();
return p;
}

注意,不同于mycpu(),使用myproc()的返回值不需要进行开关中断保护。因为当前进程的指针不论处于哪个CPU都是不变的。

Sleep and wakeup

前面我们已经介绍了进程隔离性的基本图像,接下来要讲xv6是如何让进程之间互动的。xv6使用的是经典的sleep and wakeup,也叫序列协调(sequence coordination)条件同步机制(conditional synchronization mechanisms。下面,将从最基本的自旋锁实现信号量开始,来逐步讲解xv6的sleep and wakeup机制。

自旋锁实现信号量

image-20230120150659730

image-20230120150715925

缺点就是自旋太久了,因而我们需要在等待的时候调用yield,直到资源生产出来之后再继续执行。

不安全的sleep and wakeup

Let’s imagine a pair of calls, sleep and wakeup, that work as follows:

  1. sleep(chan)

    Sleeps on the arbitrary value chan, called the wait channel. Sleep puts the calling process to sleep, releasing the CPU for other work.

  2. wakeup(chan)

    Wakes all processes sleeping on chan (if any), causing their sleep calls to return. If no processes are waiting on chan, wakeup does nothing.

这样一来,信号量实现就可修改为这样了:

image-20230120151051989

但是,我们可以注意到,在212-213行这里产生了一个先检查后执行的竞态条件。

如果消费者进程执行到212-213中间,此时生产者进程已经调用结束,也就是说wakeup并没有唤醒任何消费者进程。消费者进程就会一直在sleep中没人唤醒,除非生产者进程再执行一次。这样就会造成lost wake-up 这个问题。

所以,我们可以选择把这个竞态条件也放入s->lock这个锁区域保护。

image-20230120151353712

但是这样一来又会产生死锁问题。因而,我们可以尝试着修改sleep和wakeup的接口定义。

sleep and wakeup

We’ll fix the preceding scheme by changing sleep’s interface:

The caller must pass the condition lock to sleep so it can release the lock after the calling process is marked as asleep and waiting on the sleep channel. The lock will force a concurrent V to wait until P has finished putting itself to sleep, so that the wakeup will find the sleeping consumer and wake it up. Once the consumer is awake again sleep reacquires the lock before returning.

也即在sleep中:

1
2
3
4
5
6
7
sleep(s,&s->lock){
// do something
release(&s->lock);
//wait until wakeup
acquire(&s->lock);
return;
}

这样一来,信号量就可以完美实现了:

image-20230120151807102

image-20230120151820455

注:严格地说,wakeup只需跟在acquire之后就足够了(也就是说,可以在release之后调用wakeup

【想了一下,有一说一确实,放在release前后都不影响】

原始Unix内核的sleep只是禁用了中断,这就足够了,因为Unix运行在单CPU系统上。因为xv6在多处理器上运行,所以它为sleep添加了一个显式锁。

Code: Sleep and wakeup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Atomically release lock and sleep on chan.
// Reacquires lock when awakened.
void
sleep(void *chan, struct spinlock *lk)
{
struct proc *p = myproc();

// Must acquire p->lock in order to
// change p->state and then call sched.
// Once we hold p->lock, we can be
// guaranteed that we won't miss any wakeup
// (wakeup locks p->lock),
// so it's okay to release lk.
if(lk != &p->lock){ //DOC: sleeplock0
// 获取进程锁,释放外部锁
// 此进程锁将在scheduler线程中释放
acquire(&p->lock); //DOC: sleeplock1
release(lk);
}

// Go to sleep.
p->chan = chan;
p->state = SLEEPING;

sched();
// 到这里来,说明已经被wakeup且被调度了

// Tidy up.
p->chan = 0;

// Reacquire original lock.
if(lk != &p->lock){
//释放进程锁,获取外部锁
// 此进程锁是在scheduler中获取到的
release(&p->lock);
acquire(lk);
}
}

注意,如果lk为p->lock,那么lk依然会在scheduler线程中被暂时释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Wake up all processes sleeping on chan.
// Must be called without any p->lock.
void
wakeup(void *chan)
{
struct proc *p;

for(p = proc; p < &proc[NPROC]; p++) {
acquire(&p->lock);
if(p->state == SLEEPING && p->chan == chan) {
p->state = RUNNABLE;
}
release(&p->lock);
}
}

可以注意到,关于chan这一变量的取值是非常任意的,仅需取一个约定俗成的值就OK。这里取为了信号量的地址,同时满足了逻辑需求和语义需求。

Callers of sleep and wakeup can use any mutually convenient number as the channel. Xv6 often uses the address of a kernel data structure involved in the waiting.

这里也解释了为什么需要while循环

有时,多个进程在同一个通道上睡眠;例如,多个进程读取同一个管道。一个单独的wakeup调用就能把他们全部唤醒。其中一个将首先运行并获取与sleep一同调用的锁,并且(在管道例子中)读取在管道中等待的任何数据。尽管被唤醒,其他进程将发现没有要读取的数据。从他们的角度来看,醒来是“虚假的”,他们必须再次睡眠。因此,在检查条件的循环中总是调用sleep

Code: Pipes

pipes很显然就是生产者消费者模式的一个例证。

1
2
3
4
5
6
7
8
struct pipe {
struct spinlock lock;
char data[PIPESIZE];
uint nread; // number of bytes read
uint nwrite; // number of bytes written
int readopen; // read fd is still open
int writeopen; // write fd is still open
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int
piperead(struct pipe *pi, uint64 addr, int n)
{
int i;
struct proc *pr = myproc();
char ch;

acquire(&pi->lock);
while(pi->nread == pi->nwrite && pi->writeopen){ //DOC: pipe-empty并且依然有进程在写
if(pr->killed){
release(&pi->lock);
return -1;
}
// 等待直到pipe不为空
sleep(&pi->nread, &pi->lock); //DOC: piperead-sleep
}
for(i = 0; i < n; i++){ //DOC: piperead-copy
if(pi->nread == pi->nwrite)
break;
ch = pi->data[pi->nread++ % PIPESIZE];
if(copyout(pr->pagetable, addr + i, &ch, 1) == -1)
break;
}
// 唤醒写入管道的进程
wakeup(&pi->nwrite); //DOC: piperead-wakeup
release(&pi->lock);
return i;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int
pipewrite(struct pipe *pi, uint64 addr, int n)
{
int i;
char ch;
struct proc *pr = myproc();

acquire(&pi->lock);
for(i = 0; i < n; i++){
while(pi->nwrite == pi->nread + PIPESIZE){ //DOC: pipewrite-full管道满则阻塞
if(pi->readopen == 0 || pr->killed){
release(&pi->lock);
return -1;
}
// 唤醒读取管道的进程
wakeup(&pi->nread);
sleep(&pi->nwrite, &pi->lock);
}
if(copyin(pr->pagetable, &ch, addr + i, 1) == -1)
break;
pi->data[pi->nwrite++ % PIPESIZE] = ch;
}
wakeup(&pi->nread);
release(&pi->lock);
return i;
}

一个非常有意思且巧妙的点,就是读写管道等待在不同的chan上,这与上面信号量的例子是不一样的。想想也确实,如果使用同一个管道的话,当唤醒的时候,就会把不论是读还是写的全部进程都唤醒过来,这对性能显然损失较大。

The pipe code uses separate sleep channels for reader and writer (pi->nread and pi->nwrite); this might make the system more effificient in the unlikely event that there are lots of readers and writers waiting for the same pipe.

Code: Wait, exit, and kill

exit和wait

Sleepwakeup可用于多种等待。第一章介绍的一个有趣的例子是子进程exit和父进程wait之间的交互。

xv6记录子进程终止直到wait观察到它的方式是让exit将调用方置于ZOMBIE状态,在那里它一直保持到父进程的wait注意到它,将子进程的状态更改为UNUSED,复制子进程的exit状态码,释放子进程,并将子进程ID返回给父进程。

如果父进程在子进程之前退出,则父进程将子进程交给init进程,init进程将永久调用wait;因此,每个子进程退出后都有一个父进程进行清理。

又是一个生产者消费者模式。只不过此时的chan是父进程,资源是僵尸子进程【草】。由于涉及到进程间的调度切换,因而实现稍微复杂了点。

为什么需要涉及到进程间的调度呢?子进程设置完僵尸状态后,直接通过函数ret不行吗?答案是不行,因为ret的话就会去到不知道哪的地方【大概率会变成scause=2的情况】,所以这里子进程想要退出,就得做几件事,一是依靠父进程,让父进程杀死子进程,二是把自己设置为一个特殊的状态,使得自己不会被调度从而执行ret指令出错,三是尽快让父进程杀死自己越快越好。综合上述三个原因,exit最终在调度方面的实现方式,就变成了,子进程设置自己为ZOMBIE态->启用调度->父进程杀死ZOMBIE态的子进程。这期间不变性条件的防护,就得依赖于锁,以及sleep和wakeup了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void
exit(int status)
{
struct proc *p = myproc();

// ...

// we need the parent's lock in order to wake it up from wait().
// the parent-then-child rule says we have to lock it first.
// 整个xv6都必须遵守相同的顺序(父级,然后是子级)不论是锁定还是释放,都是先父再子
acquire(&original_parent->lock);
acquire(&p->lock);

// Give any children to init.
// 把自己的所有孩子都托付给init进程
// init进程就是在操作系统启动时
reparent(p);

// Parent might be sleeping in wait().
// 唤醒wait中的父进程
// 这里看上去很诡异,明明子进程状态还未完全,怎么就唤醒父亲了呢?但其实很安全。
// 此时子进程仍持有父进程的锁,如果有别的CPU中断进入scheduler线程,到父进程那时会卡在aquire
// 直到子进程完成后续工作后父进程才能被真正唤醒执行
wakeup1(original_parent);

p->xstate = status;
// 设为ZOMBIE态
p->state = ZOMBIE;

// 完成后续工作,解除父进程的锁
release(&original_parent->lock);

// Jump into the scheduler, never to return.
// 子进程会在父进程中被释放,所以永远不会回来
sched();
panic("zombie exit");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
int
wait(uint64 addr)
{
struct proc *np;
int havekids, pid;
struct proc *p = myproc();

// hold p->lock for the whole time to avoid lost
// wakeups from a child's exit().
acquire(&p->lock);

for(;;){
// Scan through table looking for exited children.
havekids = 0;
for(np = proc; np < &proc[NPROC]; np++){
// this code uses np->parent without holding np->lock.
// acquiring the lock first would cause a deadlock,
// since np might be an ancestor, and we already hold p->lock.
// 下面的第一点其实一句话就可以搞定:
// 【它违反了先获取父亲锁,再获取子锁的xv6代码规定】
// 1.要是在这句话之前acquire的话,acquire到你爸,你爸这时候也刚好执行到这句话
// 那么就会造成你在自旋【此时你爸在wait一开始就得到了锁】,
// 你爸也在自旋【你也在wait一开始得到了锁】,这样就造成了死锁
// 2.并且由于np->parent只有parent才能改,所以数据是否过时是没关系的
// 因为如果不是你儿子,数据过时与否都知道不是你儿子
// 如果是你儿子,那数据压根就不会过时
if(np->parent == p){
// np->parent can't change between the check and the acquire()
// because only the parent changes it, and we're the parent.
acquire(&np->lock);
havekids = 1;
if(np->state == ZOMBIE){
// Found one.
pid = np->pid;
// 传递返回参数
if(addr != 0 && copyout(p->pagetable, addr, (char *)&np->xstate,
sizeof(np->xstate)) < 0) {
release(&np->lock);
release(&p->lock);
return -1;
}
freeproc(np);
release(&np->lock);
release(&p->lock);
return pid;
}
release(&np->lock);
}
}

// No point waiting if we don't have any children.
if(!havekids || p->killed){
release(&p->lock);
return -1;
}

// Wait for a child to exit.
// 暂时释放p锁,等待子进程获取退出
sleep(p, &p->lock); //DOC: wait-sleep
}
}

其中值得注意的几个点:

  1. wait中的sleep中释放的条件锁是等待进程的p->lock,这是上面提到的特例。

  2. exit会将自己的所有子进程交付给一直在等待着的init进程:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    for(;;){
    printf("init: starting sh\n");
    pid = fork();
    // ...
    for(;;){
    // this call to wait() returns if the shell exits,
    // or if a parentless process exits.
    wpid = wait((int *) 0);
    if(wpid == pid){
    // the shell exited; restart it.
    break;
    } else if(wpid < 0){
    printf("init: wait returned an error\n");
    exit(1);
    } else {
    // 这里!!
    // it was a parentless process; do nothing.
    }
    }
    }

    如果子进程退出,就会通过init的wait释放它们。然后init释放完它们后进入第三个if分支,继续进行循环。

  3. wakeup1

    Exit calls a specialized wakeup function, wakeup1, that wakes up only the parent, and only if it is sleeping in wait.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // Wake up p if it is sleeping in wait(); used by exit().
    // Caller must hold p->lock.
    static void
    wakeup1(struct proc *p)
    {
    if(!holding(&p->lock))
    panic("wakeup1");
    if(p->chan == p && p->state == SLEEPING) {
    p->state = RUNNABLE;
    }
    }

kill

kill其实做得很温和。它只是会把想鲨的进程的p->killed设置为1,然后如果该进程sleeping,则唤醒它。最后的死亡以及销毁由进程自己来做。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Kill the process with the given pid.
// The victim won't exit until it tries to go
// to kernel space (see usertrap() in trap.c).
int
kill(int pid)
{
struct proc *p;

for(p = proc; p < &proc[NPROC]; p++){
acquire(&p->lock);
if(p->pid == pid){
p->killed = 1;
if(p->state == SLEEPING){
// Wake process from sleep().
p->state = RUNNABLE;
}
release(&p->lock);
return 0;
}
release(&p->lock);
}
return -1;
}
// in trap.c usertrap()
if(p->killed)
exit(-1);

可能这里有一个疑问:调用完exit后,进程会变成ZOMBIE态。谁最终把它释放了呢?其实答案很简单,只有两种:init进程或者是创建它的父进程。

如果创建它的父进程处于wait中,那么是由父进程把它销毁的,这没什么好说的。但如果创建它的父进程不在wait呢?那么父进程最后也是会调用exit的。父进程调用完exit后,会将其所有子进程过继给init进程。所以,ZOMBIE进程最终还是会迟早被init进程杀死的。

由这里,可以窥见xv6进程管理的进一步的冰山一角:

init进程是所有进程的根系进程。它一直处于wait的死循环中,因而可以将需要被杀死的进程杀死。

可见,wait和exit,实际上就构筑了进程的生命周期的最后一环。

这种巧妙地将进程生命周期这个大事完全托付给了wait和exit这两个函数的这种结构,实在是非常精妙,太牛了吧。

一些XV6的sleep循环不检查p->killed,因为代码在应该是原子操作的多步系统调用的中间。virtio驱动程序(*kernel/virtio_disk.c*:242)就是一个例子:它不检查p->killed,因为一个磁盘操作可能是文件系统保持正确状态所需的一组写入操作之一。等待磁盘I/O时被杀死的进程将不会退出,直到它完成当前系统调用并且usertrap看到killed标志

Xv6对kill的支持并不完全令人满意:有一些sleep循环可能应该检查p->killed。一个相关的问题是,即使对于检查p->killedsleep循环,sleepkill之间也存在竞争;后者可能会设置p->killed,并试图在受害者的循环检查p->killed之后但在调用sleep之前尝试唤醒受害者。如果出现此问题,受害者将不会注意到p->killed,直到其等待的条件发生。这可能比正常情况要晚一点(例如,当virtio驱动程序返回受害者正在等待的磁盘块时)或永远不会发生(例如,如果受害者正在等待来自控制台的输入,但用户没有键入任何输入)。

是的,所以这个kill的实现其实是相当玄学的。

Real world

xv6调度器实现了一个简单的调度策略:它依次运行每个进程。这一策略被称为轮询调度(round robin)。真实的操作系统实施更复杂的策略,例如,允许进程具有优先级。

我记得linux0.11用的是时间片轮转+优先级队列完美融合的方法,是真的很牛逼

复杂的策略可能会导致意外的交互,例如优先级反转(priority inversion)和航队(convoys)。当低优先级进程和高优先级进程共享一个锁时,可能会发生优先级反转,当低优先级进程持有该锁时,可能会阻止高优先级进程前进。当许多高优先级进程正在等待一个获得共享锁的低优先级进程时,可能会形成一个长的等待进程航队;一旦航队形成,它可以持续很长时间。为了避免此类问题,在复杂的调度器中需要额外的机制。

wakeup中扫描整个进程列表以查找具有匹配chan的进程效率低下。一个更好的解决方案是用一个数据结构替换sleepwakeup中的chan,该数据结构包含在该结构上休眠的进程列表,例如Linux的等待队列。

是的,linux的那个wakeup真的很牛,我现在都还记得当初学到那的时候的震撼。

wakeup的实现会唤醒在特定通道上等待的所有进程,可能有许多进程在等待该特定通道。操作系统将安排所有这些进程,它们将竞相检查睡眠条件。进程的这种行为有时被称为惊群效应(thundering herd),最好避免。

大多数条件变量都有两个用于唤醒的原语:signal用于唤醒一个进程;broadcast用于唤醒所有等待进程。

一个实际的操作系统将在固定时间内使用空闲列表找到自由的proc结构体,而不是allocproc中的线性时间搜索;xv6使用线性扫描是为了简单起见。

Lab: Multithreading

You will implement switching between threads in a user-level threads package, use multiple threads to speed up a program, and implement a barrier.

这个introduction看起来还是非常激动人心的,很早就想了解到底线程是怎么实现的了。不过做完发现思想还是很简单的,就是只用切换上下文和栈就行。可以看看提供给的代码。

Uthread: switching between threads

In this exercise you will design the context switch mechanism for a user-level threading system, and then implement it.

To get you started, your xv6 has two files user/uthread.c and user/uthread_switch.S, and a rule in the Makefile to build a uthread program.

uthread.c contains most of a user-level threading package, and code for three simple test threads. The threading package is missing some of the code to create a thread and to switch between threads.

You will need to add code to thread_create() and thread_schedule() in user/uthread.c, and thread_switch in user/uthread_switch.S.

One goal is ensure that when thread_schedule() runs a given thread for the first time, the thread executes the function passed to thread_create(), on its own stack.

Another goal is to ensure that thread_switch saves the registers of the thread being switched away from, restores the registers of the thread being switched to, and returns to the point in the latter thread’s instructions where it last left off. You will have to decide where to save/restore registers; modifying struct thread to hold registers is a good plan.

You’ll need to add a call to thread_switch in thread_schedule; you can pass whatever arguments you need to thread_switch, but the intent is to switch from thread t to next_thread.

实现的是用户级线程,其栈保存在对应父进程的地址空间中。

感想

思路

看了一遍它这里面写的题目还是有点抽象的,需要结合着给的代码看,那样就清晰多了。

首先,要补全的地方有这几个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 1. in thread_schedule()
if (current_thread != next_thread) { /* switch threads? */
next_thread->state = RUNNING;
t = current_thread;
current_thread = next_thread;
/* YOUR CODE HERE
* Invoke thread_switch to switch from t to next_thread:
* thread_switch(??, ??);
*/
} else
next_thread = 0;
// 2. in thread_create()
void
thread_create(void (*func)())
{
struct thread *t;

for (t = all_thread; t < all_thread + MAX_THREAD; t++) {
if (t->state == FREE) break;
}
t->state = RUNNABLE;
// YOUR CODE HERE
}
// 3. in uthread_switch.S
/*
* save the old thread's registers,
* restore the new thread's registers.
*/

.globl thread_switch
thread_switch:
/* YOUR CODE HERE */
ret /* return to ra */

这几个函数到时候会被如此调用:

1
2
3
4
5
6
7
8
9
10
11
12
int
main(int argc, char *argv[])
{
a_started = b_started = c_started = 0;
a_n = b_n = c_n = 0;
thread_init();
thread_create(thread_a);
thread_create(thread_b);
thread_create(thread_c);
thread_schedule();
exit(0);
}

所以,我们在第一个地方要做的,就是要填入swtch的签名。第二个地方要做的,就是要想办法让该线程一被启动就去执行参数的函数指针。第三个地方要做的,就是要完成上下文的切换。

所以思路其实是很直观的。我们可以模仿进程管理中用来表示上下文的context,在thread_create的时候把里面的ra设置为参数的函数指针入口,sp修改为thread结构体中的栈地址。swtch函数则完全把kernel/swtch.S超过来就行。

在这个思路中,我们是怎么做到栈的切换的呢?

每个线程在thread_create的时候,都将自己的context中的sp修改为自己的栈地址。这样一来,在它们被调度的时候,switch会自然而然地从context中读取sp作为之后运行的sp,这样就实现了栈的切换。

我觉得其他方面都不难,最坑最细节的【也是我完全没有想到的……】就是这里:

1
2
// 修改sp为栈顶
t->context.sp = (uint64)t->stack + STACK_SIZE;

需要注意,栈顶并不是t->stack

通过测试程序:

1
2
3
4
5
6
7
8
9
10
11
12
int main(){
int a[5]={1,2,3,4,5};
for(int i=0;i<5;i++){
printf("%p\n",&a[i]);
}
return 0;
}
0062feb8
0062febc
0062fec0
0062fec4
0062fec8

栈是向下增长的,因而,栈顶确实应该是数组的末尾……

这里完全没有想到,还是吃了基础的亏啊。

如果这里将t->stack作为sp,那么运行时会出现非常诡异的现象(打印的是abc三个的thread->state):

image-20230120232149776

仅有c【经测试,是仅有最后一个启动的线程】在执行,而ab的state都不是理想中的2,而是很奇怪的值。我确实有想过栈溢出问题,但是马上被我否定了。我完全没有想到是那样错的【悲】

关于swtch

Update,验收时学长问为什么这里的uswitch.S为什么无需保存tn这样的寄存器。答案是因为tn是caller-save的,线程这相当于仅仅是执行一个函数,所以只需保存callee-save的寄存器。

内核的swtch也只保存了这些callee-save的寄存器,也是同一个道理。

image-20231219000100763

image-20231219000047041

tn寄存器被保存在调用者的栈帧中。感觉也能理解为什么那题作业题说上文进程的现场是由栈保存了。

代码

增加context结构体定义,修改thread结构体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
struct context {
uint64 ra;
uint64 sp;

// callee-saved
uint64 s0;
uint64 s1;
uint64 s2;
uint64 s3;
uint64 s4;
uint64 s5;
uint64 s6;
uint64 s7;
uint64 s8;
uint64 s9;
uint64 s10;
uint64 s11;
};


struct thread {
char stack[STACK_SIZE]; /* the thread's stack */
int state; /* FREE, RUNNING, RUNNABLE */
struct context context;
};
修改thread_create
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void
thread_create(void (*func)())
{
struct thread *t;

for (t = all_thread; t < all_thread + MAX_THREAD; t++) {
if (t->state == FREE) break;
}
t->state = RUNNABLE;
// YOUR CODE HERE
// 将当前上下文保存入context
thread_switch((uint64)(&(t->context)),(uint64)(&(t->context)));
// 修改sp为栈顶
t->context.sp = (uint64)t->stack + STACK_SIZE;
// 修改ra为参数的函数指针入口
t->context.ra = (uint64)func;
}
修改thread_schedule
1
2
3
4
5
6
7
8
9
10
11
if (current_thread != next_thread) {         /* switch threads?  */
next_thread->state = RUNNING;
t = current_thread;
current_thread = next_thread;
/* YOUR CODE HERE
* Invoke thread_switch to switch from t to next_thread:
* thread_switch(??, ??);
*/
thread_switch((uint64)(&(t->context)),(uint64)(&(current_thread->context)));
} else
next_thread = 0;
修改thread_switch

全部照搬kernel/swtch.S,没什么好说的

Using threads

Barrier

In this assignment you’ll implement a barrier: a point in an application at which all participating threads must wait until all other participating threads reach that point too.

直接上代码,还是比较简单的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static void
barrier()
{
// YOUR CODE HERE
//
// Block until all threads have called barrier() and
// then increment bstate.round.
//
pthread_mutex_lock(&(bstate.barrier_mutex));
bstate.nthread++;
while(bstate.nthread < nthread){
pthread_cond_wait(&(bstate.barrier_cond), &(bstate.barrier_mutex));
goto end;
}
// 此部分仅一个线程会进入
pthread_cond_broadcast(&(bstate.barrier_cond));
bstate.nthread = 0;
bstate.round++;
end:
pthread_mutex_unlock(&(bstate.barrier_mutex));
}

Update

关于pthread_cond的实现,也是使用了条件变量的思想,这个值得以后有时间了解一下。