Неблокирующее чтение на подпроцесс. PIPE в Python

507

Я использую модуль подпроцесса, чтобы запустить подпроцесс и подключиться к его выходному потоку (stdout). Я хочу иметь возможность выполнять неблокирующие чтения на своем стандартном выводе. Есть ли способ сделать .readline неблокирующим или проверить, есть ли данные в потоке, прежде чем я вызову .readline? Я хотел бы, чтобы это было переносимо или, по крайней мере, работало под Windows и Linux.

вот как я делаю это сейчас (это блокирует, .readlineесли нет доступных данных):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Матье Паге
источник
14
(Исходя из Google?) Все ТРУБЫ будут заблокированы, когда один из буферов ТРУБ заполнится и не будет считан. например, тупик stdout при заполнении stderr. Никогда не передавайте ТРУБУ, которую вы не собираетесь читать.
Насер аль-Вохайби
@ NasserAl-Wohaibi, значит, лучше всегда создавать файлы?
Чарли Паркер
что-то, что мне было любопытно понять, почему это блокирование в первую очередь ... Я спрашиваю, потому что я видел комментарий:To avoid deadlocks: careful to: add \n to output, flush output, use readline() rather than read()
Чарли Паркер
Это «по замыслу», ожидая получения входных данных.
Матье Паге
связанные: stackoverflow.com/q/19880190/240515
user240515

Ответы:

403

fcntl, select, asyncprocНе поможет в этом случае.

Надежный способ чтения потока без блокировки независимо от операционной системы заключается в использовании Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line
JFS
источник
6
Да, это работает для меня, я удалил много, хотя. Это включает в себя хорошие практики, но не всегда необходимо. Python 3.x 2.X compat и close_fds могут быть опущены, он все равно будет работать. Но просто знайте, что все делает, и не копируйте это вслепую, даже если это просто работает! (На самом деле самое простое решение - это использовать поток и выполнить readline, как это сделал Себ, Qeues - это просто простой способ получить данные, есть и другие, потоки - это ответ!)
Aki
3
Внутри потока вызывается out.readlineблокировка потока и основного потока, и мне приходится ждать, пока readline вернется, прежде чем все остальное продолжится. Есть ли простой способ обойти это? (Я читаю несколько строк из моего процесса, который также является еще одним .py файлом, который выполняет DB и все такое)
Джастин,
3
@Justin: 'out.readline' не блокирует основной поток, он выполняется в другом потоке.
Jfs
4
Что делать, если я не могу закрыть подпроцесс, например. из-за исключений? поток stdout-reader не умрет, а python зависнет, даже если основной поток вышел, не так ли? как можно обойти это? Python 2.x не поддерживает уничтожение потоков, что еще хуже, не поддерживает их прерывание. :( (очевидно, нужно обработать исключения, чтобы убедиться, что подпроцесс выключен, но на случай, если это не так, что вы можете сделать?)
n611x007
3
Я создал несколько дружественных оболочек этого в пакете shelljob pypi.python.org/pypi/shelljob
edA-qa mort-ora-y
77

У меня часто была похожая проблема; Программы на Python, которые я пишу часто, должны иметь возможность выполнять некоторые основные функции, одновременно принимая пользовательский ввод из командной строки (stdin). Простое помещение функции обработки пользовательского ввода в другой поток не решает проблему, поскольку readline()блокирует и не имеет времени ожидания. Если основная функциональность завершена и больше нет необходимости ждать дальнейшего ввода данных пользователем, я обычно хочу, чтобы моя программа завершала работу, но это не может быть, потому что readline()все еще блокируется в другом потоке, ожидая строки. Решение, которое я нашел для этой проблемы, состоит в том, чтобы сделать stdin неблокирующим файлом с помощью модуля fcntl:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

На мой взгляд, это немного чище, чем использование модулей select или signal для решения этой проблемы, но опять же, это работает только в UNIX ...

Джесси
источник
1
Согласно документации, fcntl () может получить либо дескриптор файла, либо объект, имеющий метод .fileno ().
Денилсон Са Майя
10
Ответ Джесси не верен. Согласно Гвидо, readline не работает правильно с неблокирующим режимом, и не будет до Python 3000. bugs.python.org/issue1175#msg56041 Если вы хотите использовать fcntl, чтобы установить файл в неблокирующий режим, Вы должны использовать os.read () нижнего уровня и отделить строки самостоятельно. Смешивание fcntl с высокоуровневыми вызовами, которые выполняют буферизацию строки, вызывает проблемы.
anonnn
2
Использование readline кажется неправильным в Python 2. См. Ответ anonnn stackoverflow.com/questions/375427/…
Catalin Iacob
10
Пожалуйста, не используйте занятые петли. Используйте poll () с таймаутом для ожидания данных.
Иво Данихелка
@ Стефано, что buffer_sizeопределяется как?
кот
39

Python 3.4 представляет новый временный API для асинхронного IO- asyncioмодуля .

Подход похож на twistedответ на основе @Bryan Ward - определите протокол, и его методы вызываются, как только данные готовы:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

См. «Подпроцесс» в документации .

Существует высокоуровневый интерфейс, asyncio.create_subprocess_exec()который возвращает Processобъекты, которые позволяют асинхронно читать строки, используя StreamReader.readline()сопрограммусинтаксисом async/ awaitPython 3.5+ ):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() выполняет следующие задачи:

  • запустить подпроцесс, перенаправить его стандартный поток в канал
  • читать строку из stdout подпроцесса асинхронно
  • убить подпроцесс
  • дождитесь его выхода

Каждый шаг может быть ограничен тайм-аутом секунд, если это необходимо.

JFS
источник
Когда я пытаюсь сделать что-то подобное с использованием сопрограмм Python 3.4, я получаю вывод только после того, как весь скрипт будет запущен. Я хотел бы видеть напечатанную строку вывода, как только подпроцесс напечатает строку. Вот что у меня есть: pastebin.com/qPssFGep .
flutefreak7
1
@ flutefreak7: проблемы с буферизацией не связаны с текущим вопросом. Перейдите по ссылке для возможных решений.
Jfs
Спасибо! Решил проблему для моего сценария, просто используя print(text, flush=True)так, чтобы напечатанный текст был немедленно доступен для вызова наблюдателя readline. Когда я тестировал его с помощью исполняемого файла на Фортране, я на самом деле хочу обернуть / посмотреть, он не буферизует вывод, поэтому он ведет себя как ожидалось.
flutefreak7
Можно ли разрешить подпроцессу сохраняться и выполнять дальнейшие операции чтения / записи. readline_and_killво втором сценарии работает очень похоже subprocess.comunicateна то, что завершает процесс после одной операции чтения / записи. Я также вижу, что вы используете один канал stdout, который подпроцесс обрабатывает как неблокирующий. Пытаясь использовать оба, stdoutи stderr я обнаружил, что в конечном итоге блокирует .
Карел
@Carel код в ответе работает как задумано, как описано в ответе явно. При желании можно реализовать другое поведение. Оба канала одинаково неблокируют, если их использовать, вот пример, как читать с обоих каналов одновременно .
JFS
19

Попробуйте модуль asyncproc . Например:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

Модуль заботится обо всех потоках, как это было предложено S.Lott.

Ной
источник
1
Абсолютно блестящий. Гораздо проще, чем необработанный модуль подпроцесса. У меня отлично работает на Ubuntu.
Cerin
12
asyncproc не работает на Windows, и Windows не поддерживает os.WNOHANG :-(
Брайан Оукли
26
asyncproc - это GPL, что еще больше ограничивает его использование :-(
Брайан Оукли
Спасибо. Одна маленькая вещь: кажется, что замена as с 8 пробелами в asyncproc.py - это путь :)
benjaoming
Не похоже, что вы можете получить код возврата запускаемого вами процесса через модуль asyncproc; только результат, который это произвело.
Грайай
17

Вы можете сделать это очень легко в Twisted . В зависимости от существующей кодовой базы это может быть не так просто использовать, но если вы создаете скрученное приложение, такие вещи становятся почти тривиальными. Вы создаете ProcessProtocolкласс и переопределяете outReceived()метод. Twisted (в зависимости от используемого реактора) обычно представляет собой большой select()цикл с обратными вызовами, установленными для обработки данных из различных файловых дескрипторов (часто сетевых сокетов). Таким образом, outReceived()метод просто устанавливает обратный вызов для обработки данных, поступающих из STDOUT. Простой пример, демонстрирующий это поведение:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

В документации Twisted есть хорошая информация об этом.

Если вы строите все свое приложение на Twisted, оно делает асинхронную связь с другими процессами, локальными или удаленными, действительно элегантной. С другой стороны, если ваша программа построена не на Twisted, это не очень полезно. Надеемся, что это может быть полезно для других читателей, даже если это не применимо для вашего конкретного приложения.

Брайан Уорд
источник
не хорошо. selectне должен работать на окнах с файловыми дескрипторами, в соответствии с документами
n611x007
2
@naxa Не думаю, что select()он имеет в виду то же, что и ты. Я предполагаю это, потому что Twistedработает на окнах ...
notbad.jpeg
1
«Скрученный (в зависимости от используемого реактора) обычно представляет собой большой цикл выбора ()» означает, что есть несколько реакторов на выбор. Один из select()них является наиболее переносимым для юниксов и юникс-лайков, но для Windows также доступны два реактора: twistedmatrix.com/documents/current/core/howto/…
clacke
14

Используйте select & read (1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

Для readline () - как:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a
Энди Джексон
источник
6
не хорошо. selectне должен работать на окнах с файловыми дескрипторами, в соответствии с документами
n611x007
О, МОЙ БОГ. Читайте мегабайты или, возможно, гигабайты по одному символу за раз ... это худшая идея, которую я видел за долгое время ... не говоря уже о том, что этот код не работает, потому что proc.stdout.read()независимо от того, насколько мал аргумент блокирующий вызов.
wvxvw
OSError: [WinError 10093] Either the application has not called WSAStartup, or WSAStartup failed
nmz787
8

Одним из решений является создание другого процесса для выполнения чтения процесса или создание потока процесса с таймаутом.

Вот многопоточная версия функции тайм-аута:

http://code.activestate.com/recipes/473878/

Тем не менее, вам нужно прочитать стандартный вывод по мере его поступления? Другим решением может быть выгрузка вывода в файл и ожидание завершения процесса с помощью p.wait () .

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()
monkut
источник
похоже, что поток recpie не завершится после истечения тайм-аута, и его уничтожение зависит от способности убить подпроцесс (например, не связанный с этим), который он читает (вещь, которую вы должны быть в состоянии, но на всякий случай, если вы не можете ..) ,
n611x007
7

Отказ от ответственности: это работает только для торнадо

Вы можете сделать это, установив fd как неблокирующее, а затем использовать ioloop для регистрации обратных вызовов. Я упаковал это в яйцо под названием tornado_subprocess, и вы можете установить его через PyPI:

easy_install tornado_subprocess

Теперь вы можете сделать что-то вроде этого:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

Вы также можете использовать его с RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()
Вукасин Тороман
источник
Спасибо за хорошую функцию! Просто чтобы уточнить, почему мы не можем просто использовать threading.Threadдля создания новых неблокирующих процессов? Я использовал его в on_messageэкземпляре Tornado Websocket, и он отлично справился со своей задачей.
VisioN
1
нить в основном не рекомендуется в торнадо. они хороши для небольших коротких функций. Вы можете прочитать об этом здесь: stackoverflow.com/questions/7846323/tornado-web-and-threads github.com/facebook/tornado/wiki/Threading-and-concurrency
Вукасин Тороман
@VukasinToroman ты действительно спас меня здесь с этим. Большое спасибо за модуль tornado_subprocess :)
Джеймс Гентес
это работает на окнах? (обратите внимание, что selectс файловыми дескрипторами этого не происходит )
n611x007
Эта библиотека не использует selectвызов. Я не пробовал это под Windows, но вы, вероятно, столкнетесь с проблемами, так как библиотека использует fcntlмодуль. Итак, вкратце: нет, это, вероятно, не будет работать под Windows.
Вукасин Тороман
6

Существующие решения не работают для меня (подробности ниже). Наконец, работало, чтобы реализовать readline с использованием read (1) (основываясь на этом ответе ). Последний не блокирует:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Почему существующие решения не работают:

  1. Решения, которые требуют readline (включая решения на основе очереди), всегда блокируются. Трудно (невозможно?) Убить поток, который выполняет readline. Он будет уничтожен только после завершения процесса, который его создал, но не когда процесс, производящий вывод, будет уничтожен.
  2. Смешивание низкоуровневого fcntl с высокоуровневыми вызовами readline может работать неправильно, как указывал anonnn.
  3. Использование select.poll () удобно, но не работает в Windows в соответствии с документами Python.
  4. Использование сторонних библиотек кажется излишним для этой задачи и добавляет дополнительные зависимости.
Викрам Пуди
источник
1
1. q.get_nowait()от моего ответа никогда не должны блокировать, вот в чем смысл его использования. 2. Поток, который выполняет readline ( enqueue_output()функцию ), завершается в EOF, например, включая случай, когда процесс, производящий вывод, завершается. Если вы верите, что это не так; Пожалуйста, предоставьте полный пример минимального кода, который показывает иначе (возможно, как новый вопрос ).
Jfs
1
@sebastian Я потратил час или больше, пытаясь придумать минимальный пример. В конце концов, я должен согласиться, что ваш ответ обрабатывает все случаи. Я предполагаю, что раньше это не работало для меня, потому что, когда я пытался убить процесс создания вывода, он уже был уничтожен и дал ошибку, которую трудно отладить. Час был хорошо проведен, потому что, придумав минимальный пример, я мог бы найти более простое решение.
Викрам Пуди
Не могли бы вы опубликовать более простое решение? :) (если он отличается от Себастьяна)
n611x007
@ danger89: я думаю dcmpid = myprocess.
ViFI
В условии после вызова read () (только после того, как True): out никогда не будет пустой строкой, потому что вы читаете по крайней мере строку / байты длиной 1.
sergzach
6

Вот мой код, используемый для перехвата всех выходных данных подпроцесса ASAP, включая частичные строки. Он качает одновременно и stdout и stderr в почти правильном порядке.

Протестировано и правильно работает на Python 2.7 Linux & Windows.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()
datacompboy
источник
Один из немногих ответов, которые позволят вам прочитать материал, который не обязательно заканчивается новой строкой.
Тотам
5

Я добавляю эту проблему для чтения некоторого подпроцесса. Откройте стандартный вывод. Вот мое не блокирующее решение для чтения:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Себастьян Клэйс
источник
5
fcntl не работает на окнах, согласно документации .
n611x007
@anatolytechtonik использовать msvcrt.kbhit()вместо
кошка
4

Эта версия неблокируемого чтения не требует специальных модулей и будет работать "из коробки" на большинстве дистрибутивов Linux.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())
Том лайм
источник
3

Вот простое решение на основе потоков, которое:

  • работает как на Linux, так и на Windows (не полагаясь на select ).
  • читает как stdoutиstderr асинхронно.
  • не полагается на активный опрос с произвольным временем ожидания (дружественный к ЦП).
  • не использует asyncio(что может конфликтовать с другими библиотеками).
  • выполняется до тех пор, пока дочерний процесс не завершится.

printer.py

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()
Оливье Мишель
источник
2

Добавление этого ответа здесь, поскольку он предоставляет возможность устанавливать неблокирующие каналы в Windows и Unix.

Все ctypesдетали благодаря ответу @ techtonik .

Существует немного измененная версия, которая будет использоваться как в системах Unix, так и в Windows.

  • Совместим с Python3 (требуется лишь незначительное изменение) .
  • Включает версию posix и определяет исключение для использования для любого из них.

Таким образом, вы можете использовать ту же функцию и исключение для кода Unix и Windows.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: /programming/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

Чтобы избежать чтения неполных данных, я написал собственный генератор readline (который возвращает строку байтов для каждой строки).

Это генератор, так что вы можете, например ...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)
ideasman42
источник
(1) этот комментарий указывает, что readline()не работает с неблокирующими каналами (такими как set using fcntl) на Python 2 - как вы думаете, это больше не правильно? (мой ответ содержит ссылку ( fcntl), которая предоставляет ту же информацию, но теперь кажется, что она удалена). (2) Посмотрите, как multiprocessing.connection.PipeиспользуетсяSetNamedPipeHandleState
JFS
Я проверял это только на Python3. Но видел эту информацию тоже и ожидаю, что она остается в силе. Я также написал свой собственный код для использования вместо readline, я обновил свой ответ, чтобы включить его.
ideasman42
2

У меня проблема с оригинальным вопросником, но я не хотел вызывать темы. Я смешал решение Джесси с прямым read () из канала и своим собственным обработчиком буфера для чтения строк (однако, мой подпроцесс - ping - всегда записывал полные строки <размер системной страницы). Я избегаю ожидания, занятого только чтением в часах, зарегистрированных в gobject. В эти дни я обычно запускаю код внутри главного цикла gobject, чтобы избежать потоков.

def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down

Наблюдатель

def watch(f, *other):
print 'reading',f.read()
return True

И основная программа устанавливает пинг, а затем вызывает gobject mail loop.

def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()

Любая другая работа привязана к обратным вызовам в gobject.

Dave Kitchen
источник
2

В современном Python дела обстоят намного лучше.

Вот простая дочерняя программа "hello.py":

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

И программа для взаимодействия с ним:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

Это распечатывает:

b'hello bob\n'
b'hello alice\n'

Обратите внимание, что фактический шаблон, который также содержится почти во всех предыдущих ответах, как здесь, так и в связанных вопросах, состоит в том, чтобы установить дескриптор файла дочернего элемента stdout на неблокирующее, а затем опрашивать его в каком-то цикле выбора. В эти дни, конечно, этот цикл предоставляется asyncio.

user240515
источник
1

выберите помогает определить, где находится следующий полезный ввод.

Тем не менее, вы почти всегда счастливее с отдельными темами. Один выполняет блокировку чтения стандартного ввода, другой - везде, где вы не хотите блокировать.

С. Лотт
источник
11
Я думаю, что этот ответ бесполезен по двум причинам: (a) модуль select не будет работать на каналах под Windows (как четко указано в предоставленной ссылке), что противоречит намерениям OP иметь портативное решение. (b) Асинхронные потоки не допускают синхронного диалога между родительским и дочерним процессами. Что, если родительский процесс хочет отправить следующее действие в соответствии со следующей строкой, прочитанной дочерним элементом ?!
ThomasH
4
select также бесполезен, так как чтение Python будет блокироваться даже после выбора, поскольку он не имеет стандартной семантики C и не будет возвращать частичные данные.
Хельмут Гроне
Отдельная тема для чтения из выходных данных ребенка решила мою проблему, которая была похожа на эту. Если вам нужно синхронное взаимодействие, я думаю, вы не можете использовать это решение (если вы не знаете, какой результат ожидать). Я бы принял этот ответ
Эмилиано
1

зачем беспокоить поток и очередь? в отличие от readline (), BufferedReader.read1 () не блокирует ожидание \ r \ n, он возвращает как можно скорее, если есть какие-либо выходные данные.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()
mfmain
источник
Будет ли он возвращаться как можно скорее, если ничего не приходит? Если это не так, это блокирует.
Матье Паге
@ MathieuPagé прав. read1будет блокировать, если первые нижележащие блоки чтения, что происходит, когда канал еще открыт, но вход недоступен.
Джек О'Коннор
1

В моем случае мне понадобился модуль журналирования, который ловит выходные данные фоновых приложений и увеличивает их (добавляя метки времени, цвета и т. Д.).

Я закончил с фоновым потоком, который делает фактический ввод / вывод. Следующий код предназначен только для платформ POSIX. Я снял ненужные детали.

Если кто-то собирается использовать этого зверя на долгое время, подумайте об управлении открытыми дескрипторами. В моем случае это не было большой проблемой.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)
Дмитро
источник
1

Моя проблема немного отличается, так как я хотел собрать и stdout, и stderr из запущенного процесса, но, в конечном счете, то же самое, поскольку я хотел визуализировать вывод в виджете как сгенерированный.

Я не хотел прибегать ко многим из предложенных обходных путей с использованием очередей или дополнительных потоков, поскольку они не должны быть необходимы для выполнения такой распространенной задачи, как запуск другого сценария и сбор его выходных данных.

Прочитав предложенные решения и документы по Python, я решил свою проблему с помощью реализации ниже. Да, это работает только для POSIX, так как я использую selectвызов функции.

Я согласен с тем, что документы сбивают с толку, и реализация такой распространенной задачи сценариев неудобна. Я полагаю, что более старые версии python имеют разные значения по умолчанию Popenи разные объяснения, что создавало много путаницы. Кажется, это хорошо работает как для Python 2.7.12, так и для 3.5.2.

Ключ должен был установить bufsize=1буферизацию строки, а затем universal_newlines=Trueобработать как текстовый файл вместо двоичного файла, который, по-видимому, становится настройкой по умолчанию bufsize=1.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG и VERBOSE - это просто макросы, которые печатают вывод на терминал.

Это решение ИМХО эффективно на 99,99%, так как оно все еще использует readlineфункцию блокировки , поэтому мы предполагаем, что подпроцесс хорош и выводит полные строки.

Я приветствую отзывы, чтобы улучшить решение, так как я все еще новичок в Python.

Брук Уоллес
источник
В этом конкретном случае вы можете установить stderr = subprocess.STDOUT в конструкторе Popen и получить весь вывод из cmd.stdout.readline ().
Аарон
Хороший четкий пример. Были проблемы с select.select (), но это помогло мне.
maharvey67
0

Опираясь на ответ Дж. Ф. Себастьяна и несколько других источников, я собрал простой менеджер подпроцессов. Он обеспечивает запрос неблокируемого чтения, а также запускает несколько процессов параллельно. Он не использует какой-либо специфичный для ОС вызов (который я знаю) и поэтому должен работать где угодно.

Это доступно от pypi, так просто pip install shelljob. Обратитесь к странице проекта для примеров и полных документов.

эд-ка морт-ора-у
источник
0

РЕДАКТИРОВАТЬ: эта реализация по-прежнему блокирует. Вместо этого используйте ответ JFSebastian .

Я попробовал лучший ответ , но дополнительный риск и обслуживание кода потока вызывали беспокойство.

Просматривая модуль io (и ограничиваясь 2.6), я нашел BufferedReader. Это моё безрезультатное, неблокирующее решение.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line
romc
источник
ты пробовал for line in iter(p.stdout.readline, ""): # do stuff with the line? Он без потоков (однопоточный) и блокирует, когда ваш код блокируется.
JFS
@ jf-sebastian Да, в конце концов я вернулся к твоему ответу. Моя реализация все еще иногда блокируется. Я отредактирую свой ответ, чтобы предупредить других не идти по этому пути.
romc
0

Недавно я наткнулся на ту же проблему, мне нужно прочитать одну строку за раз из потока (хвостовой запуск в подпроцессе) в неблокирующем режиме. Я хотел избежать следующих проблем: не записывать процессор, не читать поток одним байтом ( как readline сделал) и т. д.

Вот моя реализация https://gist.github.com/grubberr/5501e1a9760c3eab5e0a, она не поддерживает Windows (опрос), не обрабатывает EOF, но она хорошо работает для меня

grubberr
источник
ответ нить на основе никак не сгореть процессор (вы можете указать произвольное , timeoutкак в вашем решении) и .readline()читает более чем один байт в то время ( bufsize=1средства от прямой линии -buffered (подходит только для записи)). Какие еще проблемы вы нашли? Ответы только на ссылки не очень полезны.
Jfs
0

Это пример запуска интерактивной команды в подпроцессе, и стандартный вывод является интерактивным с использованием псевдотерминала. Вы можете обратиться к: https://stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
Liao
источник
0

Это решение использует selectмодуль для «чтения любых доступных данных» из потока ввода-вывода. Эта функция первоначально блокируется до тех пор, пока данные не станут доступны, но затем считывает только те данные, которые доступны и не блокируются в дальнейшем.

Учитывая тот факт, что он использует selectмодуль, это работает только на Unix.

Код полностью PEP8-совместимый.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer
Брэдли Оделл
источник
0

Я также столкнулся с проблемой, описанной Джесси, и решил ее, используя «select», как это сделали Брэдли , Энди и другие, но в режиме блокировки, чтобы избежать петли занятости. Он использует фиктивную трубу как поддельный стандарт. Блоки выбора и ждут, когда будет готов либо стандартный ввод, либо канал. Когда клавиша нажата, stdin разблокирует выбор, и значение клавиши можно получить с помощью read (1). Когда другой поток записывает в канал, канал разблокирует выбор, и это может быть воспринято как указание на то, что потребность в stdin закончена. Вот некоторый ссылочный код:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()
gonzaedu61
источник
ПРИМЕЧАНИЕ. Чтобы это работало в Windows, труба должна быть заменена на гнездо. Я еще не пробовал, но это должно работать в соответствии с документацией.
gonzaedu61
0

Попробуйте wexpect , который является альтернативой pexpect для Windows .

import wexpect

p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.')               // regex pattern of any character
output_str = p.after()
betontalpfa
источник
0

В Unix-подобных системах и Python 3.5+ есть os.set_blockingименно то, что написано.

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

Это выводит:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

С os.set_blockingкомментариями это:

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''
SAAJ
источник
-2

Вот модуль, который поддерживает неблокирующие чтения и фоновые записи в python:

https://pypi.python.org/pypi/python-nonblock

Обеспечивает функцию,

nonblock_read, который будет считывать данные из потока, если он доступен, в противном случае возвращает пустую строку (или None, если поток закрыт с другой стороны и все возможные данные были прочитаны)

Вы также можете рассмотреть модуль python-subprocess2,

https://pypi.python.org/pypi/python-subprocess2

который добавляет в модуль подпроцесса. Поэтому к объекту, возвращаемому из «subprocess.Popen», добавляется дополнительный метод runInBackground. Это запускает поток и возвращает объект, который будет автоматически заполняться, когда материал записывается в stdout / stderr, без блокировки вашего основного потока.

Наслаждайтесь!

Тим Саванна
источник
Я хотел бы попробовать этот неблокирующий модуль, но я относительно новичок в некоторых процедурах Linux. Как именно установить эти подпрограммы? Я использую Raspbian Jessie, разновидность Debian Linux для Raspberry Pi. Я пробовал 'sudo apt-get install nonblock' и python-nonblock, и оба выдавали ошибку - не найдено. Я скачал zip-файл с этого сайта pypi.python.org/pypi/python-nonblock , но не знаю, что с ним делать. Спасибо .... RDK
RDK