Почему я теряю данные при использовании этой конструкции bash pipe?

11

Я пытаюсь объединить несколько программ примерно так (пожалуйста, игнорируйте любые дополнительные включения, это тяжелая работа в процессе):

pv -q -l -L 1  < input.csv | ./repeat <(nc "host" 1234)

Где источник программы повтора выглядит следующим образом:

#include <fcntl.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

#include <iostream>
#include <string>

inline std::string readline(int fd, const size_t len, const char delim = '\n')
{
    std::string result;
    char c = 0;
    for(size_t i=0; i < len; i++)
    {
        const int read_result = read(fd, &c, sizeof(c));
        if(read_result != sizeof(c))
            break;
        else
        {
            result += c;
            if(c == delim)
                break;
        }
    }
    return result;
}

int main(int argc, char ** argv)
{
    constexpr int max_events = 10;

    const int fd_stdin = fileno(stdin);
    if (fd_stdin < 0)
    {
        std::cerr << "#Failed to setup standard input" << std::endl;
        return -1;
    }


    /* General poll setup */
    int epoll_fd = epoll_create1(0);
    if(epoll_fd == -1) perror("epoll_create1: ");
    {
        struct epoll_event event;
        event.events = EPOLLIN;
        event.data.fd = fd_stdin;
        const int result = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_stdin, &event);
        if(result == -1) std::cerr << "epoll_ctl add for fd " << fd_stdin << " failed: " << strerror(errno) << std::endl;
    }

    if (argc > 1)
    {
        for (int i = 1; i < argc; i++)
        {
            const char * filename = argv[i];
            const int fd = open(filename, O_RDONLY);
            if (fd < 0)
                std::cerr << "#Error opening file " << filename << ": error #" << errno << ": " << strerror(errno) << std::endl;
            else
            {
                struct epoll_event event;
                event.events = EPOLLIN;
                event.data.fd = fd;
                const int result = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event);
                if(result == -1) std::cerr << "epoll_ctl add for fd " << fd << "(" << filename << ") failed: " << strerror(errno) << std::endl;
                else std::cerr << "Added fd " << fd << " (" << filename << ") to epoll!" << std::endl;
            }
        }
    }

    struct epoll_event events[max_events];
    while(int event_count = epoll_wait(epoll_fd, events, max_events, -1))
    {
        for (int i = 0; i < event_count; i++)
        {
            const std::string line = readline(events[i].data.fd, 512);                      
            if(line.length() > 0)
                std::cout << line << std::endl;
        }
    }
    return 0;
}

Я заметил это:

  • Когда я просто использую трубу ./repeat, все работает как задумано.
  • Когда я просто использую процесс подстановки, все работает как задумано.
  • Когда я инкапсулирую pv, используя подстановку процессов, все работает как задумано.
  • Однако, когда я использую определенную конструкцию, я теряю данные (отдельные символы) из stdin!

Я пробовал следующее:

  • Я попытался отключить буферизацию в канале между всеми процессами pvи ./repeatиспользовать их stdbuf -i0 -o0 -e0, но это, похоже, не работает.
  • Я обменял epoll на опрос, не работает.
  • Когда я смотрю на поток между pvи ./repeatс tee stream.csv, это выглядит правильно.
  • Раньше я straceвидел, что происходит, и я вижу много однобайтовых операций чтения (как и ожидалось), и они также показывают, что данные пропадают.

Интересно, что происходит? Или что я могу сделать, чтобы продолжить расследование?

Роэл Баардман
источник

Ответы:

16

Потому что ncкоманда внутри <(...)также будет читать из стандартного ввода.

Более простой пример:

$ nc -l 9999 >/tmp/foo &
[1] 5659

$ echo text | cat <(nc -N localhost 9999) -
[1]+  Done                    nc -l 9999 > /tmp/foo

Куда делись text? Через netcat.

$ cat /tmp/foo
text

Ваша программа и ncпобороться за тот же stdin, и ncполучает некоторые из них.

mosvy
источник
Ты прав! Благодарность! Можете ли вы предложить чистый способ отключения стандартного ввода в <(...)? Есть ли лучший способ, чем <( 0<&- ...)?
Роэль Баардман
5
<(... </dev/null), не используйте 0<&-: это приведет к тому, что первый open(2)вернется 0как новый fd. Если ваш ncподдерживает его, вы также можете использовать эту -dопцию.
Мосви
3

Функция epoll () или poll (), возвращаемая с помощью E / POLLIN, сообщит вам только о том, что одно чтение () может не блокироваться.

Не то чтобы вы могли делать много однобайтовых операций чтения () до новой строки, как и вы.

Я говорю, может, потому что read () после epoll (), возвращенного с E / POLLIN, все еще может блокироваться.

Ваш код также будет пытаться прочитать последние EOF и полностью игнорирует любые ошибки read ().

pizdelect
источник
Несмотря на то, что это не является прямым решением моей проблемы, спасибо за комментарий. Я понимаю, что в этом коде есть недостатки, и обнаружение EOF присутствует в менее урезанной версии (благодаря использованию POLLHUP / POLLNVAL). Я все же борюсь с поиском небуферизованного способа чтения строк из нескольких файловых дескрипторов. Моя repeatпрограмма по сути обрабатывает данные NMEA (линейные и без указателей длины) из нескольких источников. Поскольку я объединяю данные из нескольких живых источников, я бы хотел, чтобы мое решение было небуферизованным. Можете ли вы предложить более эффективный способ сделать это?
Роэль Баардман
Между прочим, выполнение системного вызова (чтение) для каждого байта является наименее эффективным способом. Проверка EOF может быть сделана просто проверкой возвращаемого значения read, нет необходимости в POLLHUP (и POLLNVAL будет возвращено, только если вы передадите ему фиктивный fd, а не EOF). Но в любом случае, следите за обновлениями. У меня есть идея ypeeутилиты, которая читает из нескольких файлов и смешивает их в другой файл, сохраняя записи (сохраняя строки нетронутыми).
pizdelect
Я заметил, что эта конструкция bash должна делать это, но я не знаю, как объединить в ней stdin: { cmd1 & cmd2 & cmd3; } > fileфайл будет содержать то, что вы описываете. Однако в моем случае я запускаю все из tcpserver (3), поэтому я хочу также включить stdin (который содержит данные клиента). Я не уверен, как это сделать.
Роэль Баардман
1
Это зависит от того, что такое cmd1, cmd2, .... Если они nc или cat и ваши данные ориентированы на строки, выходные данные могут быть искажены - вы получите строки, состоящие из начала строки, напечатанного cmd1, и конца строки, напечатанного cmd2.
pizdelect