《操作系统真像还原》操作系统实现——线程和锁

Posted on Jun 3, 2021

这篇文章写的比较乱且简略,要看的话建议跟着代码一起看。本文代码在此处

线程和进程是操作系统的重要概念。

线程和进程的区别

这似乎是一个面试中常见的问题了哈哈哈,这里我可能无法给出教科书级别的解释(实际上那也未必是最好的解释),只写一下自己的理解。

先考虑一下线程存在的意义,如果一个计算机系统是单线程的(即不能并发),显然这个机器也是可以工作的,但是每个线程在工作的时候可能会进行各种 IO 操作,此时进程会因为需要等待 IO(等待的时间的数量级可能是秒级的,相对于处理器和时钟的频率来讲非常巨大)而造成处理器的空转,这是极度浪费的。如果我们可以在等待的时间将处理器分配给别的线程,就可以大幅提高效率。又或者计算机上存在两个线程,一个需要运行数天时间,一个仅需几秒,由于前一个线程先上处理器运行,所以仅需几秒的线程就需要等待大量的时间才能执行。并发(由多线程实现)可以有效的解决这个问题。

而进程存在的意义,则主要是系统资源的隔离。一种常见的隔离方式是给每个进程分配不同的页表,这样虽然每个进程可能会有相同的虚拟地址,但是他们映射到的物理地址却是截然不同的,这样就将主存有效的隔离了开来。

有趣的是,在调度时,实际上是以线程为单位的,在此时进程实际上不再存在了。

线程的类型

根据内存管理器所处的优先级,分为两种线程,内核线程和用户态线程

  • 内核线程:优点是用户使用起来方便,缺陷在于由于需要进行特权级的切换,相对比较慢
  • 用户态线程:优点是速度上会快许多(节省了系统调用的开销),缺陷是由于需要用户自己管理线程的调度,增加了开发难度,一般是由一些库来进行统一管理,比如 glibc 的 pthread 等。

两种管理方式各有优劣,作为折中,现代操作系统将两者结合,在阻塞的时候使用内核线程,非阻塞时使用用户态线程。

线程的管理

主要需要:

  1. 线程的创建和销毁
  2. 线程的特权级变更
  3. 线程的轮询和调度

PCB

进程控制块(Process Control Block,PCB)是一种用于控制进程的结构体,相当于一个进程的身份证,我们用它也可以一起控制线程。一种简单的结构如下

typedef struct task_struct
{
    uint32_t *self_kernel_stack;
    enum task_status status;
    uint8_t priority; /* how many ticks the thread running per round */
    char name[16];

    uint8_t cpu_ticks_left;
    uint8_t cpu_ticks_elapsed;

    struct list_elem general_tag;
    struct list_elem all_list_tag;

    size_t PDE_addr;
    size_t canary;
}PCB;

使用 self_kernel_stack 来指向线程在内核态下使用的栈的栈顶,这样可以解决线程在特权级变更时的栈转移问题。

结构体中的 list_elem 结构体是一个双向链表,通过这个链表将所有线程的 PCB 串了起来,即可在轮询的时候进行遍历了。

cpu_ticks_left 表示当前的线程还可以运行多少个 ticks,以一个时钟中断为 1 tick。cpu_ticks_elapsed 则是线程已运行的 ticks。

用 task_status 枚举线程的状态

enum task_status
{
    TASK_RUNNING,
    TASK_READY,
    TASK_BLOCKD,
    TASK_WAITING,
    TASK_HANGING,
    TASK_DIED
};

轮询和调度

这里的管理使用简单的轮询机制,每次时钟中断更新当前线程的 cpu_ticks_left,并在这个线程结束时把它换下 CPU。

上下文保护

调度的过程最重要的是上下文保护,此处的调度是对同一个优先级,同一个进程的不同线程的调度,需要进行两次上下文保护

首先是中断时的上下文保护

%macro VECTOR 2                                 ; VECTOR INTnumber HAS(NONE)_ERROR_CODE
section .text
INTR%1Entry:
    %2                                          ; make all kinds of INTeRrupt
                                                ; has the same stack struct
                                                ; means a int with error code, push noting
                                                ; a int without error code, push a "error code"
    push ds
    push es
    push fs
    push gs
    pushad                                      ; save the context
  
    ; send EOI(End Of Interrupt) to the 8259A
    mov al,0x20                                 ; EOI
    out 0xa0,al                                 ; send to slave
    out 0x20,al                                 ; send to master

    push %1                                     ; push it anyway
    call [idt_table + %1 * 4]                   ; the the C function
    jmp IntExit

section .data
    dd INTR%1Entry
%endmacro

以上为中断的入口(IDT 向量表中指向的函数),这里是将线程的上下文完全保护的,所以在离开时也恢复的所有的寄存器

section .text
global IntExit                            
; ---------- this function mainly restored the context ----------
IntExit:
    add esp,4                                   ; pass the int number
    popad
    pop gs
    pop fs
    pop es
    pop ds                                      ; restore the context
    add esp,4                                   ; pass the error code
    iretd

而进入中断后,对于时钟中断的中断处理函数就是我们的线程调度器

static void IntTimerHandler()
{
    struct task_struct* current_thread = GetCurrentThreadPCB();
  
    /* detect stack overflow */
    ASSERT(current_thread->canary == STACK_CANARY);

    current_thread->cpu_ticks_elapsed++;
    ticks++;

    if (current_thread->cpu_ticks_left == 0)
    {
        ScheduleThread();
    }
    else
    {
        current_thread->cpu_ticks_left--;
    }
}

到这里,如果不需要进行线程调度,那么直接退出 IntExit 函数上,恢复线程上下文即可。

如果需要进行线程调度,就会执行 ScheduleThread() 函数

void ScheduleThread()
{
    ASSERT(GetIntStatus() == INT_OFF);
  
    struct task_struct* current_thread = GetCurrentThreadPCB();
    if (current_thread->status == TASK_RUNNING)
    {
        ASSERT(!elem_find(&ready_thread_list, &current_thread->general_tag));

        list_append(&ready_thread_list, &current_thread->general_tag);
        current_thread->cpu_ticks_left = current_thread->priority;
        current_thread->status = TASK_READY;
    }
    else
    {
        // noting to do for now
    }

    ASSERT(!list_empty(&ready_thread_list));
    thread_tag = NULL;

    /* get the first READY task, send it to the CPU */
    thread_tag = list_pop(&ready_thread_list);
    struct task_struct* next_thread = elem2entry(struct task_struct, \
                                          general_tag, thread_tag);
    next_thread->status = TASK_RUNNING;
    switch_to(current_thread, next_thread);
}

前面是对双向链表的操作,需要把当前的线程,也就是将要被换下 CPU 的线程放到 ready_thread_list 的队尾,并且取出 ready_thread_list 的队头,准备进行线程的切换,这个切换函数也比较简单

[bits 32]
section .text
global switch_to
switch_to:
; callee only backup esi, edi, ebx, ebp for caller
    push esi
    push edi
    push ebx
    push ebp

    mov eax,[esp + 20]      ; get current_thread
    mov [eax],esp           ; store current stack top to self_kernel_stack

    mov eax,[esp + 24]      ; get next_thread
    mov esp,[eax]           ; get next_thread's self_kernel_stack back
    pop ebp
    pop ebx
    pop edi
    pop esi   
    ret

可见这里仅备份了当前线程的 esi,edi,ebx,ebp,然后更新了当前线程 PCB 的栈顶指针,并用下一个线程的 PCB 的栈顶指针更新 esp,然后也仅还原 esi,edi,ebx,ebp。这里看起来比较难以理解,好像是有问题的,但是实际上这是一个很巧妙的实现。

首先 switch_to 函数在执行了 mov esp,[eax] 后其实就已经转到了下一个线程了,因为此时已经在另一个线程的栈上了。

然后用一张图来解释

每一个被换下的线程的中断处理函数出口都是还没有被执行的,所以在切换完线程后,线程的上下文会通过未执行的中断处理函数出口来恢复。

接下来再解释 switch_to 中仅保护 esi,edi,ebx,ebp 的原因,由于这个函数实际上是由 ScheduleThread 函数调用的,根据 gcc 编译器的 ABI 约定,主调函数会自己保存处理 esi,edi,ebx,ebp 之外的寄存器,所以这里只要保存这四个寄存器就可以了。

main 函数为

#include "print.h"
#include "init.h"
#include "debug.h"
#include "memory.h"
#include "thread.h"
#include "interrupt.h"

void KThreadTest(void *arg);

int _start()
{
    sys_putstr("this is kernel!\n");
    char a[16];
    InitAll();

    ThreadStart("KThreadTest", 31, KThreadTest, "argA ");
    ThreadStart("KThreadTest", 8, KThreadTest, "argB ");

    EnableInt();
    while(1)
    {
        sys_putstr("main ");
    }
    return 0;
}

void KThreadTest(void *arg)
{
    char *para = (char *) arg;
    while(1)
    {
        sys_putstr(arg);
    }
}

执行的效果为

可以看到线程调度起效,特别的,出现了 GP 错误,这是输出没有加锁造成的临界区错误。

使用锁进行临界区保护

有这些概念需要过一下,很简单很好理解

  • 公共资源:会被超过一个线程访问的内存资源
  • 临界区:访问公共资源的代码
  • 信号量:一个有初值的计数器,减到 0 时代表信号的改变(从可使用变成不可使用)
  • P 操作:减小信号量
  • V 操作:增加信号量
  • 锁:维护信号量和锁的持有者、等待者的数据结果
  • 互斥锁:信号量初值为 1 的锁(即锁最多只有一个持有者)
  • 阻塞:线程由于各种原因(此处为取锁失败)将自己阻塞,注意这是线程自己的行为,一个线程不能被别的线程阻塞;对应的别的线程可以恢复被阻塞的线程,此处使用锁的持有者在释放锁时恢复的。

之前的输出中出现 GP 错误(GP 的一种情况是非法内存访问)就是典型的临界区错误,考虑这样一个情况,线程 A 开启了显卡的光标设置端口,在写入光标值低 8 位前一刻由于时间片耗尽,被换下处理器,线程 B 也开启显卡的光标设置端口,在写入光标值高 8 位前一刻由于时间片耗尽,A 又被换上处理器,这样低 8 位值就会被写到 8 八位上,造成光标值更新错误。

一种简单粗暴的解决方法是在所有的临界区操作前关中断,操作后开终端,这样是有效的,但是实际上违背了多线程的充分利用系统资源的目标,因为许多临界区操作都是设备 IO 操作,如果暴力的开关中断,那么在此线程等待设备 IO 的时候别的线程就不能执行,CPU 仍然是空转。

所以就可以引入锁,首先解决输出临界区的问题,由于只有一个输出设备,所以可以使用互斥锁。

取得锁的操作为

void sys_lock_lock(struct lock *plock)
{
    if (plock->holder != GetCurrentThreadPCB())
    {
        SemaphoreDown(&plock->semaphore); 
        plock->holder = GetCurrentThreadPCB();
        ASSERT(plock->holder_repeat_request_sum == 0);
        plock->holder_repeat_request_sum = 1;
    }
    else
    {
        plock->holder_repeat_request_sum++;
    }
}

函数 SemaphoreDown 为 P 操作,若信号量为 1,代表没有线程持有该锁,将信号量减 1 并返回,获得锁,否则阻塞自己

void SemaphoreDown(struct semaphore* psema)
{
    enum int_status old_int_statu = DisableInt();
    PCB* current_thread = GetCurrentThreadPCB();
    while(psema->value == 0)
    {
        ASSERT(!elem_find(&psema->waiting_thread_list, &current_thread->general_tag));
        if (elem_find(&psema->waiting_thread_list, &current_thread->general_tag))
        {
            PANIC("thread already in wating list");
        }
        list_append(&psema->waiting_thread_list, &current_thread->general_tag);
        BlockThread(TASK_BLOCKD);
    }
    psema->value--;
    SetIntStatus(old_int_statu);
}

类似的有释放锁的操作

void sys_lock_unlock(struct lock* plock)
{
    ASSERT(plock->holder == GetCurrentThreadPCB());
    if (plock->holder_repeat_request_sum > 1)
    {
        plock->holder_repeat_request_sum--;
        return;
    }
    ASSERT(plock->holder_repeat_request_sum == 1)
    plock->holder_repeat_request_sum = 0;
    plock->holder = NULL;
    SemaphoreUp(&plock->semaphore);
}
void SemaphoreUp(struct semaphore* psema)
{
    enum int_status old_int_statu = DisableInt();
    if (!list_empty(&psema->waiting_thread_list))
    {
        PCB* thread_blocked = elem2entry(PCB, general_tag, \
        list_pop(&psema->waiting_thread_list));
        UnblockThread(thread_blocked); 
    }
    psema->value++;
    SetIntStatus(old_int_statu);
}

新增一个控制台设备,用于输出

#include "console.h"
#include "print.h"
#include "stdint.h"
#include "sync.h"

static struct lock console_lock;

void ConsoleInit()
{
    LockInit(&console_lock);
}

void ConsoleAcquire()
{
    sys_lock_lock(&console_lock);
}

void ConsoleRelease()
{
    sys_lock_unlock(&console_lock);
}

int console_putstr(char *str)
{
    ConsoleAcquire();
    int output_num = sys_putstr(str);
    ConsoleRelease();
    return output_num;
}

void console_putchar(char char_asi)
{
    ConsoleAcquire();
    sys_putchar(char_asi);
    ConsoleRelease();
}

int console_putint(int num)
{
    ConsoleAcquire();
    int output_num = sys_putint(num);
    ConsoleRelease();
    return output_num;
}

修改 main 为

#include "print.h"
#include "init.h"
#include "debug.h"
#include "memory.h"
#include "thread.h"
#include "interrupt.h"
#include "console.h"

void KThreadTest(void *arg);

int _start()
{
    sys_putstr("this is kernel!\n");
    InitAll();

    ThreadStart("KThreadTestA", 31, KThreadTest, "argA ");
    ThreadStart("KThreadTestB", 8, KThreadTest, "argB ");

    EnableInt();
    while(1)
    {
        console_putstr("main ");
    }
    return 0;
}

void KThreadTest(void *arg)
{
    char *para = (char *) arg;
    while(1)
    {
        console_putstr(arg);
    }
}

后就可以正常输出了。