Объединить входные данные из нескольких файлов / каналов без зазубрин линий или блокировок?

9

Существует ли инструмент, который будет принимать входные данные из нескольких файлов или каналов и записывать их в стандартный вывод, не блокируя операции чтения, чтобы отдельные строки ввода выходили нетронутыми? Я в основном хочу мультиплексировать кучу входов на один выход без заусенцев линий.

$ combine file1 <(prog2) ... > nice-output.txt
  1. Меня не волнует порядок вывода
  2. Он не должен блокироваться, пока на некоторых входах есть данные
  3. Это должно быть эффективно (то есть, я могу понизить ваш Perl на одну строку;)
Джей Хакер
источник

Ответы:

4

Вы должны быть в состоянии сделать это multitailдовольно легко.

Калеб
источник
1
Можете ли вы предложить, какие аргументы я бы использовал с multitail? Похоже, что он не имеет неинтерактивного режима, зависает при попытке записи в стандартный вывод и завершает чтение из канала.
Джей Хакер
Начните с -Lвыполнения команды и объедините выходные данные с текущим потоком и -aзапишите выходные данные в файл. Я посмотрю больше завтра. Если вы приведете более подробный пример, я постараюсь проработать его.
Калеб
4

Если процессы записывают строки в одном writeвызове, который требует, чтобы процессы использовали буферизацию строк (обычно отключенную, если их стандартный вывод не является терминалом), вы можете просто направить их все в канал.

{ { sleep .1; echo one; sleep .1; echo two; } &
  { echo hello; sleep .15; echo world; };
  wait; } | cat

Если процессы выполняют буферизацию строки только при записи в терминал, проще всего это использовать script. Это немного неуклюже: он может писать только в файл.

script -q -c '
    { { sleep .1; echo one; sleep .1; echo two; } &
      { echo hello; sleep .15; echo world; };
      wait; }'
tail -n +2 typescript

Если программы пишут длинные строки или просто не используют буферизацию строк, этот подход не сработает. Вам понадобится программа-сборщик, которая читает и буферизует строки с каждого входа отдельно и выполняет синхронизацию на концах строк. Там нет стандартной утилиты с этим функционалом. Я второе предложение Калебаmultitail .

Вот скрипт Python, который читает строки, созданные несколькими командами, и выплевывает их в свой стандартный вывод, не разбивая строку. Я не проверял это много, поэтому будьте осторожны. Я не тестировал это вообще.

#!/usr/bin/env python
import Queue, itertools, os, subprocess, sys, threading
# Queue of (producer_id, line). line==None indicates the end of a producer.
lq = Queue.Queue()

# Line producer
def run_task(i, cmd):
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
    line = p.stdout.readline()
    while line <> "":
        lq.put((i, line))
        line = p.stdout.readline()
    lq.put((i, None))

# Start a producer for each command passed as an argument
for i in range(1,len(sys.argv)):
    threading.Thread(target=run_task, args=(i, sys.argv[i])).start()
sources = len(sys.argv) - 1
# Consumer: print lines as they come in, until no producer is left.
while sources > 0:
    (k, line) = lq.get()
    if line == None: sources -= 1
    else: sys.stdout.write(str(k) + ":" + line)

Пример использования:

./collect.py 'sleep 1; ls /; sleep 1; ls /' \
             '/bin/echo -n foo; sleep 1; /bin/echo -n bar; sleep 1; /bin/echo qux'
Жиль "ТАК - перестань быть злым"
источник
1

Да, мультитейл кажется связанным с понятием «окна» как подмножества терминала; Я не мог заставить его играть как компонент конвейера.

Так выглядит как мы Хафт сделать этому себе трещины суставы

/* Copyright © 2015 sqweek@gmail.com
** Use/modify as you see fit but leave this attribution.
** If you change the interface and want to distribute the
** result please change the binary name too! */
#include <err.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/select.h>

/* typedefs are for pussies */
struct {
    char *filename; /* for clarity of errors */
    char *data;
    long len;
    long cap;
} saved[FD_SETSIZE] = {0};

void
ewriten(int fd, char *buf, int n)
{
    int done = 0, c;
    while (done < n) {
        if ((c=write(fd, buf + done, n - done)) <= 0 && errno != EINTR) {
            err(1, "write");
        }
        done += c;
    }
}

int
empty(fd_set *fdset, int maxfd)
{
    int i;
    for (i=0; i <= maxfd; i++) {
        if (FD_ISSET(i, fdset)) return 0;
    }
    return 1;
}

void
combine(fd_set *fdset, int maxfd)
{
    char buf[4096], *cp;
    fd_set ready;
    int n, i, fd, left;
    while (!empty(fdset, maxfd)) {
        ready = *fdset;
        /* timeouts are for pussies */
        if (select(maxfd + 1, &ready, NULL, NULL, NULL) == -1) err(1, "select");
        for (fd=0; fd <= maxfd; fd++) {
            if (!FD_ISSET(fd, &ready)) continue;

            switch (n=read(fd, &buf, sizeof(buf))) {
            case -1:
                if (errno == EINTR)
                    break; /* ignore interrupts; we'll re-read next iteration */
                if (saved[fd].filename) err(1, "read: %s", saved[fd].filename);
                err(1, "read: %d", fd);
            case 0:
                if (saved[fd].len > 0) {
                    /* someone forgot their newline at EOF... */
                    ewriten(1, saved[fd].data, saved[fd].len);
                    saved[fd].data[0] = '\n'; /* put it back for them */
                    ewriten(1, saved[fd].data, 1);
                }
                free(saved[fd].data);
                FD_CLR(fd, fdset);
                break;
            default:
                for (cp=buf + n - 1; cp >= buf && *cp != '\n'; cp--); /* find last newline */
                left = n - (cp - buf + 1);
                if (cp >= buf) {
                    /* we found one! first dump any saved data from the last read */
                    if (saved[fd].len > 0) {
                        ewriten(1, saved[fd].data, saved[fd].len);
                        saved[fd].len = 0;
                    }
                    ewriten(1, buf, cp - buf + 1);
                }
                if (left > 0) {
                    /* now save any leftover data for later */
                    int need = saved[fd].len + left;
                    if (saved[fd].cap < need &&
                       (saved[fd].data=realloc(saved[fd].data, need)) == NULL) {
                        errx(1, "realloc: failed on %d bytes", need);
                        /* it was good enough for quake... */
                    }
                    saved[fd].cap = need;
                    memcpy(saved[fd].data + saved[fd].len, buf + n - 1 - left, left);
                    saved[fd].len += left;
                }
            }
        }
    }
}

void
addfd(int fd, fd_set *fdset, int *maxfd)
{
    FD_SET(fd, fdset);
    if (*maxfd < fd) {
        *maxfd = fd;
    }
}

int
main(int argc, char **argv)
{
    fd_set fdset;
    char **arg = argv + 1;
    char *cp;
    struct stat st;
    int fd, maxfd = -1;
    FD_ZERO(&fdset);
    while (*arg != NULL) {
        /* getopt is for pussies */
        if (strncmp("-u", *arg, 2) == 0) {
            *arg += 2;
            if (**arg == '\0' && *++arg == NULL ) errx(1, "-u requires argument (comma separated FD list)");
            /* reentrancy is for pussies */
            for (cp=strtok(*arg, ","); cp != NULL; cp=strtok(NULL, ",")) {
                fd = atoi(cp);
                if (fstat(fd, &st) != 0) err(1, "%d", fd);
                addfd(fd, &fdset, &maxfd);
            }
            arg++;
        } else if (strcmp("-", *arg) == 0) {
            if (fstat(0, &st) != 0) err(1, "stdin", fd);
            addfd(0, &fdset, &maxfd);
            saved[0].filename = "stdin";
            arg++;
        } else if (strcmp("--", *arg) == 0) {
            arg++;
            break;
        } else if (**arg == '-') {
            errx(1, "unrecognized argument %s", *arg);
        } else {
            break; /* treat as filename */
        }
    }
    /* remaining args are filenames */
    for (; *arg != NULL; arg++) {
        /* stdio is for pussies */
        if ((fd=open(*arg, O_RDONLY)) == -1) err(1, "open: %s", *arg);
        addfd(fd, &fdset, &maxfd);
        saved[fd].filename = *arg;
    }
    combine(&fdset, maxfd);
    return 0;
}

Ах, это было хорошо.

(примечание: тестируется на двух наборах входных данных. ошибок может или не может быть)

sqweek
источник