Unix管道的实现方式


本文介绍了Unix内核中管道的实现。我对最近一篇题为“ 管道如何在Unix上工作?”的文章感到失望不是关于内部设备。我变得很感兴趣,我将自己埋藏在旧资料中以寻找答案。

我们在说啥啊?


管道-“可能是Unix上最重要的发明”-是Unix将小程序组合在一起的基本哲学的基本特征,以及熟悉的命令行:

$ echo hello | wc -c
6

此功能取决于内核提供的系统调用pipe,在管道(7)管道(2)文档页面上对此进行了描述

传送带提供单向进程间通信通道。流水线具有一个输入(写端)和一个输出(读端)。可以读取写入管道输入的数据。

使用pipe(2)返回两个文件描述符的调用创建管道:一个引用管道的输入,第二个引用输出。

上面命令的跟踪结果演示了管道的创建以及从一个进程到另一个进程的数据流:

$ strace -qf -e execve,pipe,dup2,read,write \
    sh -c 'echo hello | wc -c'

execve("/bin/sh", ["sh", "-c", "echo hello | wc -c"], …)
pipe([3, 4])                            = 0
[pid 2604795] dup2(4, 1)                = 1
[pid 2604795] write(1, "hello\n", 6)    = 6
[pid 2604796] dup2(3, 0)                = 0
[pid 2604796] execve("/usr/bin/wc", ["wc", "-c"], …)
[pid 2604796] read(0, "hello\n", 16384) = 6
[pid 2604796] write(1, "6\n", 2)        = 2

父进程调用pipe()以获取附加的文件描述符。一个子进程写入一个描述符,而另一个进程从另一个描述符读取相同的数据。包装器使用dup2“重命名”描述符3和4来匹配stdin和stdout。

如果没有管道,则外壳程序必须将一个进程的结果写入文件,然后将其传输到另一个进程,以便从文件中读取数据。结果,我们将花费更多的资源和磁盘空间。但是,管道是好的,不仅因为它们避免使用临时文件:

, read(2) , . , write(2) , .

像POSIX要求一样,这是一个重要的属性:写入管道的最大PIPE_BUF字节数(至少512)必须是原子的,以便进程可以通过管道与常规文件(不提供此类保证)相同的方式相互通信。

使用常规文件时,一个进程可以将其所有输出数据写入该文件,然后将其传输到另一个进程。或者进程可以使用外部信令机制(例如信号量)在硬并行化模式下运行,以相互告知写入或读取已完成。传送带为我们省去了所有麻烦。

我们在找什么?


我将用手指进行解释,以使您更容易想象传送带的工作方式。您将需要在内存中分配一个缓冲区和一些状态。您将需要一些函数来添加和删除缓冲区中的数据。在对文件描述符的读写操作期间,将需要一些方法来调用函数。并且需要锁来实现上述特殊行为。

现在,我们准备在灯的强光下询问内核的源代码,以确认或驳斥我们模糊的思维模型。但始终要为意外做好准备。

我们在找什么


我不知道带有Unix 6源代码的著名书籍“ Lions book ”的副本在哪里,但是由于有了Unix Heritage Society,甚至可以在源代码中在线搜索Unix的较旧版本。

在TUHS档案中徘徊类似于参观博物馆。我们可以回顾一下我们的共同历史,我尊重多年来为从旧纸盒和打印输出中逐点回收所有这些材料所做的努力。我敏锐地意识到仍然缺少的那些片段。

满足了我们对传送带古代历史的好奇心之后,我们可以看一下现代的芯子进行比较。

顺便说一句,表中pipe是系统调用号码42 sysent[]巧合?

传统的Unix内核(1970–1974)


pipe(2)PDP-7 Unix(1970年1月),Unix的第一版(1971年11月)和第二版的不完整源代码(1972年6月)中都找不到踪迹

TUHS声称Unix的第三版(1973年2月)是带有管道的第一版:

Unix的第三版是使用汇编语言编写的内核的最新版本,而第一版则使用管道。1973年,做了很多工作来改进第三版,用C重写了内核,因此出现了第四版Unix。

一位读者浏览了一份文件,道格·麦克罗伊(Doug McIlroy)提出了``通过花园软管的原理连接程序''的想法。


在布莱恩·克尼根(Brian Kernighan)的著作《Unix:历史和回忆录》中,在传送带出现的历史中,还提到了该文档:“ ...它在我贝尔实验室的办公室墙上挂了30年。”这是对麦克罗伊的采访,以及麦克罗伊在2014年撰写的工作中的另一个故事

Unix, , , , - , , . , . , , , . ? «» , , -, : « !».

. , , ( ), . . . API , .

不幸的是,第三版Unix的内核源代码丢失了。尽管我们拥有1973年11月发布的用C编写的第四版内核的源代码,但它在正式发布之前已发布了几个月,并且不包含管道实现。遗憾的是,传说中的Unix函数的源代码可能永远丢失了。

我们拥有pipe(2)两个版本的文档文本,因此您可以从搜索文档的第三版开始(对于某些单词,“手动”下划线,一串文字^ H,后跟下划线!)。该原型是pipe(2)用汇编器编写的,仅返回一个文件描述符,但是已经提供了预期的基本功能:

管道系统调用创建称为管道的输出输入机制。返回的文件描述符可用于读取和写入操作。将某些内容写入管道时,最多缓冲504字节的数据,此后暂停写入过程。从管道读取时,将获取缓冲的数据。

到第二年,用C语言重写了内核,第四版中的pipe(2)通过原型“ pipe(fildes)找到了它的现代外观

pipe , . . - , , r1 (. fildes[1]), 4096 , . , r0 (. fildes[0]), .

, ( ) ( fork) read write.

Shell具有用于定义通过管道连接的线性进程数组的语法。

从只有一端(所有写入文件描述符均已关闭)的空管道(不包含缓冲数据)进行的读取调用将返回“文件末尾”。在类似情况下的录音呼叫将被忽略。

尚存 的最早的管道实现可追溯到Unix的第五版(1974年6月),但与下一版本中出现的版本几乎相同。仅添加了注释,因此可以跳过第五版。

第六版Unix(1975)


我们开始阅读第六版 Unix源代码(1975年5月)。很大程度上要感谢Lions,发现它比早期版本的源代码要容易得多:

多年来,Lions是Bell实验室之外唯一可用的Unix核心文档。尽管第六版的许可证允许教师使用其源代码,但第七版的许可证排除了这种可能性,因此该书以非法打字副本的形式分发。

今天,您可以购买本书的重印本,并在复印机上为学生盖上封面。还要感谢Warren Tumi(发起了TUHS项目),您可以下载包含第六版源代码PDF文件。我想让您了解创建文件花费了多少精力:

15 , Lions, . TUHS , . 1988- 9 , PDP11. , , /usr/src/, 1979- , . PWB, .

. , , += =+. - , - , .

今天,我们可以在TUHS上在线阅读存档中第六版的源代码,Dennis Ritchie可以访问该源代码

顺便说一下,乍看之下,在Kernigan和Richie时期之前,C代码的主要特征是它的简洁。并非经常,我无需进行大量编辑即可嵌入代码段,以适应网站上相对狭窄的显示区域。

/usr/sys/ken/pipe.c的开头有一个解释性注释(是的,还有/ usr / sys / dmr):

/*
 * Max allowable buffering per pipe.
 * This is also the max size of the
 * file created to implement the pipe.
 * If this size is bigger than 4096,
 * pipes will be implemented in LARG
 * files, which is probably not good.
 */
#define    PIPSIZ    4096

自第四版以来,缓冲区大小未更改。但是在这里,没有任何公共文档,我们看到一旦管道使用文件作为备份存储!

对于LARG文件,它们对应于LARG inode标志,“高级寻址算法”使用该标志来处理间接块,以支持更大的文件系统。由于Ken表示最好不要使用它们,因此我很乐意接受他的承诺。

这是真实的系统调用pipe

/*
 * The sys-pipe entry.
 * Allocate an inode on the root device.
 * Allocate 2 file structures.
 * Put it all together with flags.
 */
pipe()
{
    register *ip, *rf, *wf;
    int r;

    ip = ialloc(rootdev);
    if(ip == NULL)
        return;
    rf = falloc();
    if(rf == NULL) {
        iput(ip);
        return;
    }
    r = u.u_ar0[R0];
    wf = falloc();
    if(wf == NULL) {
        rf->f_count = 0;
        u.u_ofile[r] = NULL;
        iput(ip);
        return;
    }
    u.u_ar0[R1] = u.u_ar0[R0]; /* wf's fd */
    u.u_ar0[R0] = r;           /* rf's fd */
    wf->f_flag = FWRITE|FPIPE;
    wf->f_inode = ip;
    rf->f_flag = FREAD|FPIPE;
    rf->f_inode = ip;
    ip->i_count = 2;
    ip->i_flag = IACC|IUPD;
    ip->i_mode = IALLOC;
}

评论清楚地描述了这里发生的事情。但是要理解代码并不容易,部分原因是借助« 结构用户u »的方式,并注册R0R1传输了系统调用的参数以及返回值。

让我们尝试使用ialloc()在磁盘上放置inode(inode ,并使用falloc()在内存中放置两个文件。如果一切顺利,那么我们将设置标志以将这些文件定义为管道的两端,将它们指向同一inode(其引用计数为2),并将inode标记为已更改和已使用。注意对iput()的调用在错误路径中减少新inode中的引用计数。

pipe()必须通过R0R1返回用于读取和写入的文件描述符编号。falloc()返回指向文件结构的指针,但也通过u.u_ar0[R0]文件描述符“返回” 。即,该代码保存到r文件描述符中以进行读取,并从u.u_ar0[R0]第二次调用后直接分配一个描述符以进行写入falloc()我们在创建管道时设置

的标志FPIPE控制sys2.c中rdwr()函数的行为,该函数调用特定的I / O I / O例程:

/*
 * common code for read and write calls:
 * check permissions, set base, count, and offset,
 * and switch out to readi, writei, or pipe code.
 */
rdwr(mode)
{
    register *fp, m;

    m = mode;
    fp = getf(u.u_ar0[R0]);
        /* … */

    if(fp->f_flag&FPIPE) {
        if(m==FREAD)
            readp(fp); else
            writep(fp);
    }
        /* … */
}

然后,函数readp()in pipe.c从管道读取数据。但是,从更好地跟踪实现writep()再次,由于参数传递协议的细节,代码变得更加复杂,但是一些细节可以省略。

writep(fp)
{
    register *rp, *ip, c;

    rp = fp;
    ip = rp->f_inode;
    c = u.u_count;

loop:
    /* If all done, return. */

    plock(ip);
    if(c == 0) {
        prele(ip);
        u.u_count = 0;
        return;
    }

    /*
     * If there are not both read and write sides of the
     * pipe active, return error and signal too.
     */

    if(ip->i_count < 2) {
        prele(ip);
        u.u_error = EPIPE;
        psignal(u.u_procp, SIGPIPE);
        return;
    }

    /*
     * If the pipe is full, wait for reads to deplete
     * and truncate it.
     */

    if(ip->i_size1 == PIPSIZ) {
        ip->i_mode =| IWRITE;
        prele(ip);
        sleep(ip+1, PPIPE);
        goto loop;
    }

    /* Write what is possible and loop back. */

    u.u_offset[0] = 0;
    u.u_offset[1] = ip->i_size1;
    u.u_count = min(c, PIPSIZ-u.u_offset[1]);
    c =- u.u_count;
    writei(ip);
    prele(ip);
    if(ip->i_mode&IREAD) {
        ip->i_mode =& ~IREAD;
        wakeup(ip+2);
    }
    goto loop;
}

我们想将字节写入管道的输入u.u_count首先,我们需要锁定inode(请参见plock/ prele)。

然后检查inode引用计数。当管道的两端都保持打开状态时,计数器应为2。我们保留一个链接(out rp->f_inode),因此,如果计数器小于2,则应表示读取过程已经关闭了管道的末端。换句话说,我们正在尝试在封闭的管道中进行编写,这是一个错误。错误代码EPIPE和信号首先SIGPIPE出现在Unix的第六版中。

但是,即使输送机是打开的,也可以装满。在这种情况下,我们将移开锁并进入睡眠状态,以希望另一个进程将从管道中读取并释放出足够的空间。唤醒后,我们将回到开头,再次锁定锁并开始新的录制周期。

如果管道中有足够的可用空间,则可以使用writei()向其中写入数据i_size1inode中的参数(流水线可以为0,可以为空)指示它已经包含的数据的结尾。如果有足够的记录空间,我们可以填补输送i_size1PIPESIZ。然后,我们删除该锁,并尝试唤醒任何正在等待机会从管道读取的进程。我们从头开始,看看是否设法写入所需的字节数。如果失败,则我们开始一个新的记录周期。

通常i_mode,使用inode 参数存储权限rw并且x。但是,在管道的情况下,我们的信号,某些进程正在等待编写或使用位读IREAD和,IWRITE分别。一个进程设置一个标志并调用sleep(),并且预计将来还会有其他进程调用wakeup()

真正的魔法发生在sleep()和中wakeup()。它们在slp.c中实现,著名评论的来源,“您不应理解这一点。” 幸运的是,我们不需要了解代码,只需查看一些注释即可:

/*
 * Give up the processor till a wakeup occurs
 * on chan, at which time the process
 * enters the scheduling queue at priority pri.
 * The most important effect of pri is that when
 * pri<0 a signal cannot disturb the sleep;
 * if pri>=0 signals will be processed.
 * Callers of this routine must be prepared for
 * premature return, and check that the reason for
 * sleeping has gone away.
 */
sleep(chan, pri) /* … */

/*
 * Wake up all processes sleeping on chan.
 */
wakeup(chan) /* … */

调用sleep()特定通道的进程稍后可能会被另一个调用wakeup()同一通道的进程唤醒writep()readp()通过配对呼叫协调他们的行动。请注意,通话时pipe.c始终优先,因此每个人都可以打断信号。 现在我们掌握了所有功能PPIPEsleep()sleep()

readp()

readp(fp)
int *fp;
{
    register *rp, *ip;

    rp = fp;
    ip = rp->f_inode;

loop:
    /* Very conservative locking. */

    plock(ip);

    /*
     * If the head (read) has caught up with
     * the tail (write), reset both to 0.
     */

    if(rp->f_offset[1] == ip->i_size1) {
        if(rp->f_offset[1] != 0) {
            rp->f_offset[1] = 0;
            ip->i_size1 = 0;
            if(ip->i_mode&IWRITE) {
                ip->i_mode =& ~IWRITE;
                wakeup(ip+1);
            }
        }

        /*
         * If there are not both reader and
         * writer active, return without
         * satisfying read.
         */

        prele(ip);
        if(ip->i_count < 2)
            return;
        ip->i_mode =| IREAD;
        sleep(ip+2, PPIPE);
        goto loop;
    }

    /* Read and return */

    u.u_offset[0] = 0;
    u.u_offset[1] = rp->f_offset[1];
    readi(ip);
    rp->f_offset[1] = u.u_offset[1];
    prele(ip);
}

您可能会发现从下至上更容易阅读此功能。当管道中有一些数据时,通常使用“读取和返回”分支。在这种情况下,我们使用readi()从当前f_offset读取开始读取尽可能多的数据,然后更新相应偏移量的值。

在后续读取中,如果读取偏移量已达到i_size1inode 的值,则管道将为空。我们将位置重置为0,并尝试唤醒要写入管道的任何进程。我们知道,当输送机装满时,它writep()会睡着ip+1。现在管道是空的,我们可以将其唤醒,以使其恢复记录周期。

如果没有什么可以阅读的,它readp()可以设置一个标志IREAD并入睡ip+2我们知道writep()当他将一些数据写入管道时会唤醒他什么readi()和writei()

上的注释将有助于理解,我们可以像对待普通I / O函数一样对待它们,而不是通过“ 传递参数,该函数接受文件,位置,内存中的缓冲区并计算要读取或写入的字节数。u

/*
 * Read the file corresponding to
 * the inode pointed at by the argument.
 * The actual read arguments are found
 * in the variables:
 *    u_base        core address for destination
 *    u_offset    byte offset in file
 *    u_count        number of bytes to read
 *    u_segflg    read to kernel/user
 */
readi(aip)
struct inode *aip;
/* … */

/*
 * Write the file corresponding to
 * the inode pointed at by the argument.
 * The actual write arguments are found
 * in the variables:
 *    u_base        core address for source
 *    u_offset    byte offset in file
 *    u_count        number of bytes to write
 *    u_segflg    write to kernel/user
 */
writei(aip)
struct inode *aip;
/* … */

至于“保守”的锁,然后readp()writep()块索引节点,只要他们完成工作或没有得到结果(即原因wakeup)。plock()它们的prele()工作原理很简单:使用一组不同的调用,sleepwakeup允许我们唤醒所有需要删除的锁的进程:

/*
 * Lock a pipe.
 * If its already locked, set the WANT bit and sleep.
 */
plock(ip)
int *ip;
{
    register *rp;

    rp = ip;
    while(rp->i_flag&ILOCK) {
        rp->i_flag =| IWANT;
        sleep(rp, PPIPE);
    }
    rp->i_flag =| ILOCK;
}

/*
 * Unlock a pipe.
 * If WANT bit is on, wakeup.
 * This routine is also used to unlock inodes in general.
 */
prele(ip)
int *ip;
{
    register *rp;

    rp = ip;
    rp->i_flag =& ~ILOCK;
    if(rp->i_flag&IWANT) {
        rp->i_flag =& ~IWANT;
        wakeup(rp);
    }
}

一开始我不明白为什么它readp()在通话prele(ip)之前没有通话wakeup(ip+1)writep()导致循环的第一件事是,plock(ip)如果readp()尚未删除其块,则会导致死锁,因此代码必须以某种方式正常工作。如果您看一下它wakeup(),很显然他只将睡眠过程标记为可以执行,因此将来它会sched()真正启动它。因此,它readp()会导致wakeup(),解锁,设置IREAD和调用sleep(ip+2)-在writep()恢复周期之前所有这些

这样就完成了第六版中有关输送机的描述。简单的代码,影响深远。

Unix第七版(1979年1月)是新的主要发行版(四年后),其中出现了许多新的应用程序和内核属性。同样,在使用类型转换,union'ov和类型化结构指针方面也发生了重大变化。但是,管道代码并没有太大变化。我们可以跳过此版本。

Xv6,一个简单的Unix形内核


Unix 第六版影响了Xv6内核创建,但是它是用现代C语言编写的,可以在x86处理器上运行。代码很容易阅读,很清楚。此外,与带有TUHS的Unix源不同,您可以对其进行编译,修改并在PDP 11/70之外的其他程序上运行。因此,该内核已在大学中广泛用作操作系统的教学材料。来源在Github上

该代码包含pipe.c的一种清晰且经过深思熟虑的实现,由内存中的缓冲区而不是磁盘上的inode备份。在这里,我仅给出“结构管道”和函数的定义pipealloc()

#define PIPESIZE 512

struct pipe {
  struct spinlock lock;
  char data[PIPESIZE];
  uint nread;     // number of bytes read
  uint nwrite;    // number of bytes written
  int readopen;   // read fd is still open
  int writeopen;  // write fd is still open
};

int
pipealloc(struct file **f0, struct file **f1)
{
  struct pipe *p;

  p = 0;
  *f0 = *f1 = 0;
  if((*f0 = filealloc()) == 0 || (*f1 = filealloc()) == 0)
    goto bad;
  if((p = (struct pipe*)kalloc()) == 0)
    goto bad;
  p->readopen = 1;
  p->writeopen = 1;
  p->nwrite = 0;
  p->nread = 0;
  initlock(&p->lock, "pipe");
  (*f0)->type = FD_PIPE;
  (*f0)->readable = 1;
  (*f0)->writable = 0;
  (*f0)->pipe = p;
  (*f1)->type = FD_PIPE;
  (*f1)->readable = 0;
  (*f1)->writable = 1;
  (*f1)->pipe = p;
  return 0;

 bad:
  if(p)
    kfree((char*)p);
  if(*f0)
    fileclose(*f0);
  if(*f1)
    fileclose(*f1);
  return -1;
}

pipealloc()设置执行,其包括功能的其余部分的状态piperead()pipewrite()pipeclose()实际的系统调用sys_pipe是在sysfile.c中实现的包装器我建议阅读其所有代码。复杂度是在第六版的源代码级别,但是它更容易阅读并且更令人愉快。

Linux 0.01


您可以找到Linux 0.01的源代码。研究他fs/ 的管道的实施将具有指导意义pipe.c在这里,inode用于表示管道,但是管道本身是用现代C语言编写的。如果您掌握了第六版代码,那么您将不会遇到困难。该函数的外观write_pipe()如下:

int write_pipe(struct m_inode * inode, char * buf, int count)
{
    char * b=buf;

    wake_up(&inode->i_wait);
    if (inode->i_count != 2) { /* no readers */
        current->signal |= (1<<(SIGPIPE-1));
        return -1;
    }
    while (count-->0) {
        while (PIPE_FULL(*inode)) {
            wake_up(&inode->i_wait);
            if (inode->i_count != 2) {
                current->signal |= (1<<(SIGPIPE-1));
                return b-buf;
            }
            sleep_on(&inode->i_wait);
        }
        ((char *)inode->i_size)[PIPE_HEAD(*inode)] =
            get_fs_byte(b++);
        INC_PIPE( PIPE_HEAD(*inode) );
        wake_up(&inode->i_wait);
    }
    wake_up(&inode->i_wait);
    return b-buf;
}

即使不查看结构的定义,您也可以弄清楚如何使用inode引用计数器检查写操作是否导致SIGPIPE除字节工作外,此功能还很容易与上述想法相关联。甚至逻辑sleep_on/ wake_up看起来也不那么陌生。

现代Linux,FreeBSD,NetBSD,OpenBSD内核


我很快浏览了一些现代内核。他们都没有磁盘实现(不足为奇)。 Linux有其自己的实现。尽管三个现代的BSD内核都包含基于John Dyson编写的代码的实现,但是多年来,它们之间已经变得太不相同了。

要读取fs/ pipe.c(在Linux上)或sys/ kern/ sys_pipe.c(在* BSD),需要真正的奉献。在当今的代码中,性能和对矢量和异步I / O等功能的支持非常重要。以及内存分配,锁和内核配置的详细信息-所有这些都非常不同。这不是大学开设操作系统入门课程所需要的。

无论如何,在所有这些非常不同的现代内核中,我发现了几个旧模式(例如,在写入封闭的管道时生成SIGPIPE并返回)对我来说很有趣EPIPE我可能永远不会看到实时的PDP-11计算机,但是从我出生前几年编写的代码中仍然可以学到很多东西。

由Divi Kapoor在2011年撰写的文章“ 管道和FIFO的Linux内核实现 ”概述了(到目前为止)管道如何在Linux上工作。最近的Linux提交说明其能力超过临时文件的能力流水线交互模型; 并且还显示了管道与第六版Unix内核中的“非常保守的锁定”相去甚远。

All Articles