Как мне войти при использовании многопроцессорной обработки в Python?

239

Прямо сейчас у меня есть центральный модуль в платформе, которая порождает несколько процессов, используя multiprocessingмодуль Python 2.6 . Поскольку он использует multiprocessing, существует журнал, поддерживающий многопроцессорность на уровне модуля LOG = multiprocessing.get_logger(). Согласно документам , этот регистратор имеет общие блокировки процессов, так что вы не можете вписаться sys.stderr(или в какой-либо другой дескриптор файла), имея несколько процессов, пишущих в него одновременно.

У меня сейчас проблема в том, что другие модули в фреймворке не поддерживают многопроцессорность. Как мне кажется, мне нужно сделать так, чтобы все зависимости этого центрального модуля использовали многопроцессорное ведение журнала. Это раздражает в рамках, не говоря уже о всех клиентах платформы. Есть ли альтернативы, о которых я не думаю?

cdleary
источник
10
В документах, на которые вы ссылаетесь, говорится прямо противоположное тому, что вы говорите, что регистратор не имеет общих блокировок процесса, и все перепутано - проблема, с которой я столкнулся.
Себастьян Бласк
3
см. примеры в документации stdlib: ведение журнала в одном файле из нескольких процессов . Рецепты не требуют, чтобы другие модули учитывали многопроцессорность.
JFS
Итак, для чего нужен вариант multiprocessing.get_logger()? Похоже, что на основе этих других способов ведения журнала функциональность регистрации не multiprocessingимеет большого значения.
Тим Людвински
4
get_logger()это регистратор, используемый самим multiprocessingмодулем. Это полезно, если вы хотите отладить multiprocessingпроблему.
Jfs

Ответы:

69

Единственный способ справиться с этим ненавязчиво - это:

  1. Создайте каждый рабочий процесс так, чтобы его журнал отправлялся в другой файловый дескриптор (на диск или в канал). В идеале все записи журнала должны иметь метку времени.
  2. Ваш процесс контроллера может затем выполнить одно из следующих действий:
    • При использовании файлов на диске: объедините файлы журналов в конце цикла, отсортированные по метке времени
    • При использовании каналов (рекомендуется): Объедините записи журнала на лету из всех каналов в центральный файл журнала. (Например, периодически selectиз файловых дескрипторов каналов выполняйте сортировку слиянием доступных записей журнала и переходите к централизованному журналу. Повторите.)
vladr
источник
Хорошо, это было 35 лет, прежде чем я думал об этом (думал, что я буду использовать atexit:-). Проблема в том, что он не даст вам показания в реальном времени. Это может быть частью цены многопроцессорной обработки, а не многопоточности.
cledary
@cdleary, используя конвейерный подход, это было бы как можно ближе к реальному времени (особенно если stderr не буферизован в порожденных процессах.)
vladr
1
Кстати, большое предположение здесь: не Windows. Вы на Windows?
Влад
22
Почему бы просто не использовать multiprocessing.Queue и поток регистрации в основном процессе? Кажется проще.
Брэндон Роудс
1
@BrandonRhodes - Как я уже сказал, ненавязчиво . Использование multiprocessing.Queueне будет проще, если нужно переписать много кода multiprocessing.Queueи / или если проблема в производительности
vladr
122

Я только что написал собственный обработчик журнала, который просто передает все в родительский процесс через канал. Я тестировал его всего десять минут, но, похоже, он работает довольно хорошо.

( Примечание: это жестко запрограммировано RotatingFileHandler, это мой собственный вариант использования.)


Обновление: @javier теперь поддерживает этот подход как пакет, доступный в Pypi - см. Multiprocessing -logging on Pypi, github по адресу https://github.com/jruere/multiprocessing-logging


Обновление: реализация!

Теперь он использует очередь для правильной обработки параллелизма, а также корректно восстанавливается после ошибок. Я уже использую это в производстве в течение нескольких месяцев, и текущая версия ниже работает без проблем.

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)
zzzeek
источник
4
Приведенный выше обработчик выполняет запись всего файла из родительского процесса и использует только один поток для получения сообщений, переданных от дочерних процессов. Если вы вызываете сам обработчик из порожденного дочернего процесса, то он использует его неправильно, и вы получите все те же проблемы, что и RotatingFileHandler. Я использовал приведенный выше код годами без проблем.
zzzeek
9
К сожалению, этот подход не работает в Windows. От docs.python.org/library/multiprocessing.html 16.6.2.12 «Обратите внимание, что в Windows дочерние процессы будут наследовать только уровень регистратора родительского процесса - любые другие настройки регистратора не будут унаследованы». Подпроцессы не наследуют обработчик, и вы не можете передать его явно, потому что он не может быть выбран.
Ной Йеттер
2
Стоит отметить, что multiprocessing.Queueиспользует поток, чтобы в put(). Поэтому не вызывайте put(т. Е. Регистрируйте сообщения с помощью MultiProcessingLogобработчика) перед созданием всех подпроцессов. В противном случае поток будет мертв в дочернем процессе. Одним из решений является вызов Queue._after_fork()в начале каждого дочернего процесса или использование multiprocessing.queues.SimpleQueueвместо него, которое не связано с потоком, но блокирует.
Данки Ван
5
Не могли бы вы добавить простой пример, который показывает инициализацию, а также использование из гипотетического дочернего процесса? Я не совсем уверен, как дочерний процесс должен получить доступ к очереди без создания экземпляра другого класса.
JesseBuesking
11
@zzzeek, ​​это решение хорошо, но я не смог найти пакет с ним или что-то подобное, поэтому я создал один называется multiprocessing-logging.
Хавьер
30

QueueHandlerявляется родным в Python 3.2+, и делает именно это. Это легко копируется в предыдущих версиях.

Документы Python содержат два полных примера: ведение журнала в один файл из нескольких процессов

Для тех, кто использует Python <3.2, просто скопируйте QueueHandlerв свой собственный код с: https://gist.github.com/vsajip/591589 или альтернативно импортируйте logutils .

Каждый процесс (включая родительский процесс) помещает свою Queueзапись в журнал , а затем listenerпоток или процесс (для каждого из них предоставляется один пример) выбирает их и записывает их все в файл - без риска повреждения или искажения.

fantabolous
источник
21

Ниже приведено еще одно решение с упором на простоту для всех, кто (как и я), попал сюда из Google. Регистрация должна быть легкой! Только для 3.2 или выше.

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()
user2133814
источник
2
QueueHandlerИ QueueListenerклассы могут быть использованы на Python 2.7 , а также, доступны в logutilsпакете.
Лев Левицкий
5
Регистратор основного процесса также должен использовать QueueHandler. В вашем текущем коде основной процесс обходит очередь, поэтому между основным процессом и рабочим могут возникать расы. Каждый должен войти в очередь (через QueueHandler), и только QueueListener должен быть разрешен для входа в StreamHandler.
Исмаэль ЭЛЬ АТИФИ
Кроме того, вам не нужно инициализировать регистратор у каждого ребенка. Просто начните регистратор в родительском процессе, и получите регистратор в каждом дочернем процессе.
okwap
20

Еще одной альтернативой могут быть различные не-файловые обработчики журналирования в loggingпакете :

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(и другие)

Таким образом, вы могли бы легко иметь демон ведения журнала, в который вы могли бы писать безопасно и обрабатывать результаты правильно. (Например, простой сервер сокетов, который просто распаковывает сообщение и отправляет его в свой собственный обработчик вращающихся файлов.)

Это SyslogHandlerпозаботится и о тебе. Конечно, вы можете использовать свой собственный экземпляр syslog, а не системный.

Али Афшар
источник
13

Вариант других, в котором ведение журнала и потока очереди разделены.

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)
ironhacker
источник
Мне нравится идея извлечения имени регистратора из записи очереди. Это позволяет использовать обычный fileConfig()в MainProcess и едва настроенный регистратор в PoolWorkers (только с setLevel(logging.NOTSET)). Как я уже упоминал в другом комментарии, я использую пул, поэтому мне пришлось получать мою очередь (прокси-сервер) из диспетчера вместо многопроцессорной обработки, чтобы ее можно было засолить. Это позволяет мне передавать очередь работнику внутри словаря (большая часть которого получена из объекта argsparse vars()). Я чувствую, что, в конце концов, это лучший подход для MS Windows, в котором отсутствует функция fork () и не используется решение @zzzeak.
млт
@mlt Я думаю, что вы могли бы также поставить многопроцессорную очередь в init вместо использования Manager (см. ответ на stackoverflow.com/questions/25557686/… - о блокировках, но я верю, что это работает и для очередей)
fantabolous
@fantabolous Это не будет работать на MS Windows или любой другой платформе, которой не хватает fork. Таким образом, каждый процесс будет иметь свою собственную независимую бесполезную очередь. Второй подход в связанном Q / A не будет работать на таких платформах. Это способ непереносимого кода.
Млт
@mlt Интересно. Я использую Windows, и мне кажется, что она работает нормально - вскоре после того, как я в последний раз прокомментировал, я создал пул процессов, совместно использующих multiprocessing.Queueосновной процесс, и с тех пор использую его постоянно. Не претендую на понимание, почему это работает.
Фантабол
10

Все текущие решения слишком связаны с конфигурацией регистрации с помощью обработчика. Мое решение имеет следующую архитектуру и функции:

  • Вы можете использовать любую конфигурацию регистрации
  • Ведение журнала выполняется в потоке демона.
  • Безопасное отключение демона с помощью контекстного менеджера
  • Связь с потоком регистрации осуществляется multiprocessing.Queue
  • В подпроцессах logging.Logger(и уже определенных экземплярах) вносятся исправления для отправки всех записей в очередь
  • Новое : отформатировать трассировку и сообщение перед отправкой в ​​очередь, чтобы избежать ошибок посадки

Код с примером использования и выходом можно найти по следующему адресу : https://gist.github.com/schlamar/7003737

schlamar
источник
Если я что-то не упустил, это на самом деле не поток демонов, так как вы никогда не устанавливаете daemon_thread.daemonна True. Мне нужно было сделать это, чтобы заставить мою программу на Python правильно завершиться, когда в диспетчере контекста происходит исключение.
blah238
Мне также нужно было ловить, регистрировать и глотать исключения, выдаваемые целью, funcв logged_callпротивном случае исключение искажалось бы с другими зарегистрированными выходными данными. Вот моя модифицированная версия этого: gist.github.com/blah238/8ab79c4fe9cdb254f5c37abfc5dc85bf
blah238
8

Так как мы можем представлять многопроцессорное ведение журнала как много издателей и одного подписчика (слушателя), используя ZeroMQ для реализации обмена сообщениями PUB-SUB действительно является опцией.

Кроме того, модуль PyZMQ , привязки Python для ZMQ, реализует PUBHandler , который является объектом для публикации сообщений регистрации через сокет zmq.PUB.

В Интернете есть решение для централизованного ведения журналов из распределенного приложения с использованием PyZMQ и PUBHandler, которое может быть легко адаптировано для локальной работы с несколькими процессами публикации.

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

# This one will be used by publishing processes
class PUBLogger:
    def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

# This one will be used by listener process
class SUBLogger:
    def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(logging.DEBUG)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind('tcp://*:{1}'.format(ip, port))
        self._sub.setsockopt(zmq.SUBSCRIBE, "")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

  @property
  def sub(self):
      return self._sub

  @property
  def logger(self):
      return self._logger

#  And that's the way we actually run things:

# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
    sub_logger = SUBLogger(ip)
    while not event.is_set():
        try:
            topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
            log_msg = getattr(logging, topic.lower())
            log_msg(message)
        except zmq.ZMQError as zmq_error:
            if zmq_error.errno == zmq.EAGAIN:
                pass


# Publisher processes loggers should be initialized as follows:

class Publisher:
    def __init__(self, stop_event, proc_id):
        self.stop_event = stop_event
        self.proc_id = proc_id
        self._logger = pub_logger.PUBLogger('127.0.0.1').logger

     def run(self):
         self._logger.info("{0} - Sending message".format(proc_id))

def run_worker(event, proc_id):
    worker = Publisher(event, proc_id)
    worker.run()

# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
                                 args=('127.0.0.1'), stop_event,))
sub_logger_process.start()

#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
    processes.append(Process(target=run_worker,
                                 args=(stop_event, i,)))
for p in processes:
    p.start()
Самуил
источник
6

Мне также нравится ответ zzzeek, ​​но Андре прав, что для предотвращения искажений требуется очередь. Мне немного повезло с трубкой, но я увидел искаженный звук, что несколько ожидаемо. Реализовать его оказалось сложнее, чем я думал, в частности из-за работы в Windows, где есть некоторые дополнительные ограничения в отношении глобальных переменных и прочего (см .: Как реализована многопроцессорная обработка Python в Windows? )

Но, наконец, я получил это работает. Этот пример, вероятно, не идеален, поэтому комментарии и предложения приветствуются. Он также не поддерживает настройку форматера или чего-либо другого, кроме корневого регистратора. По сути, вам нужно переустановить регистратор в каждом из процессов пула с очередью и настроить другие атрибуты в регистраторе.

Опять же, любые предложения о том, как сделать код лучше, приветствуются. Я, конечно, еще не знаю всех трюков Python :-)

import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue

class MultiProcessingLogHandler(logging.Handler):
    def __init__(self, handler, queue, child=False):
        logging.Handler.__init__(self)

        self._handler = handler
        self.queue = queue

        # we only want one of the loggers to be pulling from the queue.
        # If there is a way to do this without needing to be passed this
        # information, that would be great!
        if child == False:
            self.shutdown = False
            self.polltime = 1
            t = threading.Thread(target=self.receive)
            t.daemon = True
            t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        #print "receive on"
        while (self.shutdown == False) or (self.queue.empty() == False):
            # so we block for a short period of time so that we can
            # check for the shutdown cases.
            try:
                record = self.queue.get(True, self.polltime)
                self._handler.emit(record)
            except Queue.Empty, e:
                pass

    def send(self, s):
        # send just puts it in the queue for the server to retrieve
        self.queue.put(s)

    def _format_record(self, record):
        ei = record.exc_info
        if ei:
            dummy = self.format(record) # just to get traceback text into record.exc_text
            record.exc_info = None  # to avoid Unpickleable error

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        time.sleep(self.polltime+1) # give some time for messages to enter the queue.
        self.shutdown = True
        time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown

    def __del__(self):
        self.close() # hopefully this aids in orderly shutdown when things are going poorly.

def f(x):
    # just a logging command...
    logging.critical('function number: ' + str(x))
    # to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
    time.sleep(x % 3)

def initPool(queue, level):
    """
    This causes the logging module to be initialized with the necessary info
    in pool threads to work correctly.
    """
    logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
    logging.getLogger('').setLevel(level)

if __name__ == '__main__':
    stream = StringIO.StringIO()
    logQueue = multiprocessing.Queue(100)
    handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
    logging.getLogger('').addHandler(handler)
    logging.getLogger('').setLevel(logging.DEBUG)

    logging.debug('starting main')

    # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
    pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
    pool.map(f, range(0,50))
    pool.close()

    logging.debug('done')
    logging.shutdown()
    print "stream output is:"
    print stream.getvalue()
Майк Миллер
источник
1
Интересно, if 'MainProcess' == multiprocessing.current_process().name:можно ли использовать вместо прохода child?
млн. В
В случае, если кто-то другой пытается использовать пул процессов вместо отдельных объектов процессов в Windows, стоит упомянуть, что диспетчер должен использоваться для передачи очереди подпроцессам, поскольку он не может быть выбран напрямую.
млн. В
Эта реализация работала хорошо для меня. Я изменил его для работы с произвольным числом обработчиков. Таким образом, вы можете сконфигурировать ваш корневой обработчик не многопроцессорным способом, тогда, когда будет безопасно создать очередь, передать туда корневые обработчики, удалить их и сделать этот единственный обработчик.
Jaxor24
3

просто опубликуйте где-нибудь свой экземпляр регистратора. таким образом, другие модули и клиенты могут использовать ваш API, чтобы получить регистратор без необходимости import multiprocessing.

Хавьер
источник
1
Проблема в том, что многопроцессорные регистраторы выглядят безымянными, поэтому вы не сможете легко расшифровать поток сообщений. Возможно, было бы возможно назвать их после создания, что сделало бы более разумным рассмотрение.
cledary
хорошо, опубликуйте один регистратор для каждого модуля или, лучше, экспортируйте разные замыкания, которые используют регистратор с именем модуля. смысл в том, чтобы позволить другим модулям использовать ваш API
Хавьер
Определенно разумно (и +1 от меня!), Но я бы упустил возможность просто import logging; logging.basicConfig(level=logging.DEBUG); logging.debug('spam!')из любого места и заставить его работать должным образом.
cledary
3
Это интересное явление, которое я вижу, когда использую Python, мы настолько привыкли к тому, что можем делать то, что хотим, в одну или две простые строки, что простой и логичный подход на других языках (например, для публикации многопроцессорного регистратора или переноса) это в аксессоре) все равно ощущается как бремя. :)
Kylotan
3

Мне понравился ответ zzzeek. Я бы просто заменил канал на очередь, поскольку, если несколько потоков / процессов используют один и тот же конец канала для создания сообщений журнала, они будут искажены.

Андре Круз
источник
У меня были некоторые проблемы с обработчиком, хотя сообщения не искажались, просто все перестало работать. Я поменял трубу на очередь, так как это более уместно. Однако ошибки, которые я получал, не были устранены этим - в конечном итоге я добавил try / кроме метода receive () - очень редко попытка зарегистрировать исключения завершится неудачей, и в результате все получится. После того, как я добавил команду try / исключением, она без проблем запускается неделями, и файл standarderr будет получать около двух ошибочных исключений в неделю.
zzzeek
2

Как насчет делегирования всего ведения журнала другому процессу, который считывает все записи журнала из очереди?

LOG_QUEUE = multiprocessing.JoinableQueue()

class CentralLogger(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.log = logger.getLogger('some_config')
        self.log.info("Started Central Logging process")

    def run(self):
        while True:
            log_level, message = self.queue.get()
            if log_level is None:
                self.log.info("Shutting down Central Logging process")
                break
            else:
                self.log.log(log_level, message)

central_logger_process = CentralLogger(LOG_QUEUE)
central_logger_process.start()

Просто поделитесь LOG_QUEUE с помощью любого из многопроцессорных механизмов или даже наследования, и все это прекрасно работает!

Саван
источник
1

У меня есть решение, похожее на Ironhacker, за исключением того, что я использую logging.exception в своем коде и обнаружил, что мне нужно отформатировать исключение, прежде чем передать его обратно через очередь, так как трассировки не могут быть задействованы:

class QueueHandler(logging.Handler):
    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue
    def emit(self, record):
        if record.exc_info:
            # can't pass exc_info across processes so just format now
            record.exc_text = self.formatException(record.exc_info)
            record.exc_info = None
        self.queue.put(record)
    def formatException(self, ei):
        sio = cStringIO.StringIO()
        traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
        s = sio.getvalue()
        sio.close()
        if s[-1] == "\n":
            s = s[:-1]
        return s
Ричард Джонс
источник
Я нашел полный пример здесь .
Арье Лейб Таурог
1

Ниже приведен класс, который можно использовать в среде Windows, требуется ActivePython. Вы также можете наследовать для других обработчиков журналов (StreamHandler и т. Д.)

class SyncronizedFileHandler(logging.FileHandler):
    MUTEX_NAME = 'logging_mutex'

    def __init__(self , *args , **kwargs):

        self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME)
        return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs)

    def emit(self, *args , **kwargs):
        try:
            win32event.WaitForSingleObject(self.mutex , win32event.INFINITE)
            ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs)
        finally:
            win32event.ReleaseMutex(self.mutex)
        return ret

И вот пример, который демонстрирует использование:

import logging
import random , time , os , sys , datetime
from string import letters
import win32api , win32event
from multiprocessing import Pool

def f(i):
    time.sleep(random.randint(0,10) * 0.1)
    ch = random.choice(letters)
    logging.info( ch * 30)


def init_logging():
    '''
    initilize the loggers
    '''
    formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s")
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    file_handler = SyncronizedFileHandler(sys.argv[1])
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

#must be called in the parent and in every worker process
init_logging() 

if __name__ == '__main__':
    #multiprocessing stuff
    pool = Pool(processes=10)
    imap_result = pool.imap(f , range(30))
    for i , _ in enumerate(imap_result):
        pass
user6336812
источник
Вероятно, использование multiprocessing.Lock()вместо Windows Mutex сделает решение переносимым.
xmedeko
1

Вот мой простой взлом / обходной путь ... не самый полный, но легко модифицируемый и более простой для чтения и понимания, я думаю, чем любые другие ответы, которые я нашел до написания этого:

import logging
import multiprocessing

class FakeLogger(object):
    def __init__(self, q):
        self.q = q
    def info(self, item):
        self.q.put('INFO - {}'.format(item))
    def debug(self, item):
        self.q.put('DEBUG - {}'.format(item))
    def critical(self, item):
        self.q.put('CRITICAL - {}'.format(item))
    def warning(self, item):
        self.q.put('WARNING - {}'.format(item))

def some_other_func_that_gets_logger_and_logs(num):
    # notice the name get's discarded
    # of course you can easily add this to your FakeLogger class
    local_logger = logging.getLogger('local')
    local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
    local_logger.debug('hmm, something may need debugging here')
    return num*2

def func_to_parallelize(data_chunk):
    # unpack our args
    the_num, logger_q = data_chunk
    # since we're now in a new process, let's monkeypatch the logging module
    logging.getLogger = lambda name=None: FakeLogger(logger_q)
    # now do the actual work that happens to log stuff too
    new_num = some_other_func_that_gets_logger_and_logs(the_num)
    return (the_num, new_num)

if __name__ == '__main__':
    multiprocessing.freeze_support()
    m = multiprocessing.Manager()
    logger_q = m.Queue()
    # we have to pass our data to be parallel-processed
    # we also need to pass the Queue object so we can retrieve the logs
    parallelable_data = [(1, logger_q), (2, logger_q)]
    # set up a pool of processes so we can take advantage of multiple CPU cores
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
    worker_output = pool.map(func_to_parallelize, parallelable_data)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    # get the contents of our FakeLogger object
    while not logger_q.empty():
        print logger_q.get()
    print 'worker output contained: {}'.format(worker_output)
nmz787
источник
1

Есть этот отличный пакет

Пакет: https://pypi.python.org/pypi/multiprocessing-logging/

код: https://github.com/jruere/multiprocessing-logging

Установка:

pip install multiprocessing-logging

Затем добавьте:

import multiprocessing_logging

# This enables logs inside process
multiprocessing_logging.install_mp_handler()
Хуан Исаза
источник
3
Эта библиотека основана на другом комментарии к текущему сообщению SO: stackoverflow.com/a/894284/1698058 .
Крис Хант
Происхождение: stackoverflow.com/a/894284/1663382 Я ценю пример использования модуля в дополнение к документации на домашней странице.
Liquidgenius
0

Одна из альтернатив - записать протокол многопроцессорной обработки в известный файл и зарегистрировать atexitобработчик для присоединения к этим процессам, считав его обратно на stderr; тем не менее, вы не получите поток в реальном времени к выводимым сообщениям на stderr таким образом.

cdleary
источник
этот подход, который вы предлагаете ниже, идентичен подходу из вашего комментария здесь stackoverflow.com/questions/641420/…
iruvar
0

Если у вас есть взаимоблокировки, возникающие в комбинации блокировок, потоков и вилок в loggingмодуле, это сообщается в отчете об ошибке 6721 (см. Также связанный вопрос SO ).

Существует небольшая Fixup решение размещена здесь .

Тем не менее, это просто исправит любые потенциальные тупики в logging. Это не исправит, что вещи могут быть искажены. Смотрите другие ответы, представленные здесь.

Альберт
источник
0

Простейшая идея как упомянуто:

  • Захватите имя файла и идентификатор текущего процесса.
  • Настройте [WatchedFileHandler][1]. Причины этого обработчика подробно обсуждаются здесь , но вкратце существуют некоторые худшие условия гонки с другими обработчиками журналирования. У этого есть самое короткое окно для условия гонки.
    • Выберите путь для сохранения журналов, например / var / log / ...
user1460675
источник
0

Для тех, кому это может понадобиться, я написал декоратор для пакета multiprocessing_logging, который добавляет текущее имя процесса в журналы, так что становится понятно, кто что регистрирует.

Он также запускает install_mp_handler (), поэтому запускать его перед созданием пула становится бесполезно.

Это позволяет мне видеть, какой работник создает какие журналы сообщений.

Вот план с примером:

import sys
import logging
from functools import wraps
import multiprocessing
import multiprocessing_logging

# Setup basic console logger as 'logger'
logger = logging.getLogger()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(u'%(asctime)s :: %(levelname)s :: %(message)s'))
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)


# Create a decorator for functions that are called via multiprocessing pools
def logs_mp_process_names(fn):
    class MultiProcessLogFilter(logging.Filter):
        def filter(self, record):
            try:
                process_name = multiprocessing.current_process().name
            except BaseException:
                process_name = __name__
            record.msg = f'{process_name} :: {record.msg}'
            return True

    multiprocessing_logging.install_mp_handler()
    f = MultiProcessLogFilter()

    # Wraps is needed here so apply / apply_async know the function name
    @wraps(fn)
    def wrapper(*args, **kwargs):
        logger.removeFilter(f)
        logger.addFilter(f)
        return fn(*args, **kwargs)

    return wrapper


# Create a test function and decorate it
@logs_mp_process_names
def test(argument):
    logger.info(f'test function called via: {argument}')


# You can also redefine undecored functions
def undecorated_function():
    logger.info('I am not decorated')


@logs_mp_process_names
def redecorated(*args, **kwargs):
    return undecorated_function(*args, **kwargs)


# Enjoy
if __name__ == '__main__':
    with multiprocessing.Pool() as mp_pool:
        # Also works with apply_async
        mp_pool.apply(test, ('mp pool',))
        mp_pool.apply(redecorated)
        logger.info('some main logs')
        test('main program')
Орсирис де Йонг
источник
-5

Моим детям, которые десятилетиями сталкивались с той же проблемой и нашли этот вопрос на этом сайте, я оставляю этот ответ.

Простота против усложнения. Просто используйте другие инструменты. Python потрясающий, но он не был предназначен для некоторых вещей.

Следующий фрагмент демона logrotate работает для меня и не слишком усложняет ситуацию. Запланируйте это, чтобы работать ежечасно и

/var/log/mylogfile.log {
    size 1
    copytruncate
    create
    rotate 10
    missingok
    postrotate
        timeext=`date -d '1 hour ago' "+%Y-%m-%d_%H"`
        mv /var/log/mylogfile.log.1 /var/log/mylogfile-$timeext.log
    endscript
}

Вот как я его устанавливаю (символические ссылки не работают для logrotate):

sudo cp /directpath/config/logrotate/myconfigname /etc/logrotate.d/myconfigname
sudo cp /etc/cron.daily/logrotate /etc/cron.hourly/logrotate
Бальдр
источник