fiber 的另一种实现

回到公司之后想起来之前看到 boost.fiber 以及与好同学的一些对话,于是抱着疑问搜索了一下,看起来还真的有人问了类似的问题:Google 内部使用的 fiber 和 boost.fiber (在概念上)是一码事吗?结果一翻搜索还真的发现了一些有意思的东西。这则新闻提到了 Google 开源 fiber 相关技术的细节。其实早在 9 年前的 Linux Plumbers Conference 上(2013)就有 Googler 做了相关的报告

User-level threads … with threads

然后转眼到了三年前 Google 提交了内核相关的一个 futex swap syscall 的 PR。这一点还挺神奇的,因为似乎 MR 之类的东西业界跟进很快,而这个东西都快 10 年了,似乎业界采用的做法与 Google 这么多年前就想清楚如何做的策略还是有很大不同。这里拿 boost.fiber 举例,它更接近于 goroutine 使用的 user thread 概念,一个比较大的问题是调试起来调试器基本不支持(但是支持 thread)。而 Google 使用的策略是通过特殊的 syscall 完成更有效率的 context switch(接近 user space 实现的 context switch)。

futex

Linux 的系统调用中提供了 futex(fast user space locking),这是上层库,诸如 POSIX semaphore、线程同步工具 mutex、condition variable、read write locks、barrier 之类实现的基础。在 futex 的手册里面给了一个实现一个类似 barrier 的例子

#define _GNU_SOURCE
#include <stdio.h>
#include <errno.h>
#include <stdatomic.h>
#include <stdint.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/time.h>

#define errExit(msg)    do { perror(msg); exit(EXIT_FAILURE); \
                        } while (0)

static uint32_t *futex1, *futex2, *iaddr;

static int
futex(uint32_t *uaddr, int futex_op, uint32_t val,
      const struct timespec *timeout, uint32_t *uaddr2, uint32_t val3) {
    return syscall(SYS_futex, uaddr, futex_op, val,
                   timeout, uaddr2, val3);
}

static void
fwait(uint32_t *futexp) {
    long s;
    while (1) {

        /* Is the futex available? */
        const uint32_t one = 1;
        if (atomic_compare_exchange_strong(futexp, &one, 0))
            break;      /* Yes */

        /* Futex is not available; wait. */
        s = futex(futexp, FUTEX_WAIT, 0, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN)
            errExit("futex-FUTEX_WAIT");
    }
}

static void
fpost(uint32_t *futexp) {
    long s;
    const uint32_t zero = 0;
    if (atomic_compare_exchange_strong(futexp, &zero, 1)) {
        s = futex(futexp, FUTEX_WAKE, 1, NULL, NULL, 0);
        if (s  == -1)
            errExit("futex-FUTEX_WAKE");
    }
}

int
main(int argc, char *argv[]) {
    pid_t childPid;
    int nloops;

    setbuf(stdout, NULL);

    nloops = (argc > 1) ? atoi(argv[1]) : 5;

    iaddr = mmap(NULL, sizeof(*iaddr) * 2, PROT_READ | PROT_WRITE,
                MAP_ANONYMOUS | MAP_SHARED, -1, 0);
    if (iaddr == MAP_FAILED)
        errExit("mmap");

    futex1 = &iaddr[0];
    futex2 = &iaddr[1];

    *futex1 = 0;        /* State: unavailable */
    *futex2 = 1;        /* State: available */

    childPid = fork();
    if (childPid == -1)
        errExit("fork");

    if (childPid == 0) {        /* Child */
        for (int j = 0; j < nloops; j++) {
            fwait(futex1);
            printf("Child  (%jd) %d\n", (intmax_t) getpid(), j);
            fpost(futex2);
        }

        exit(EXIT_SUCCESS);
    }

    /* Parent falls through to here. */
    for (int j = 0; j < nloops; j++) {
        fwait(futex2);
        printf("Parent (%jd) %d\n", (intmax_t) getpid(), j);
        fpost(futex1);
    }

    wait(NULL);
    exit(EXIT_SUCCESS);
}

这里使用的 mmap 获得了一个内存共两个进程使用(看起来和正常的应用程序的写法是不是很不一样啊),然后两者通过 futex 的 wait/wake 操作实现唤醒。

关于 fiber

这个内核的补丁提供了 SWAP 操作,与常见的 WAKE / WAIT 不同的是后者一般唤醒的对象不能指定,而 SWAP 可以有指向性的选择,这类似 user space threads / fibers 提出的 cooperative scheduling 的概念。似乎到现在还没看到开源的库被放出。

fiber 的另一种实现

留下评论