Как использовать MDC с пулами потоков?

146

В нашем программном обеспечении мы широко используем MDC для отслеживания таких вещей, как идентификаторы сеансов и имена пользователей для веб-запросов. Это отлично работает при запуске в оригинальной теме. Тем не менее, есть много вещей, которые нужно обрабатывать в фоновом режиме. Для этого мы используем java.concurrent.ThreadPoolExecutorи java.util.Timerклассы вместе с некоторыми самостоятельно прокаткой услуг исполнения асинхронных. Все эти сервисы управляют своим собственным пулом потоков.

Вот что говорит руководство Logback об использовании MDC в такой среде:

Копия сопоставленного диагностического контекста не всегда может быть унаследована рабочими потоками из инициирующего потока. Это тот случай, когда java.util.concurrent.Executors используется для управления потоками. Например, метод newCachedThreadPool создает ThreadPoolExecutor и, как и другой код пула потоков, имеет сложную логику создания потоков.

В таких случаях рекомендуется, чтобы MDC.getCopyOfContextMap () вызывался в исходном (главном) потоке перед отправкой задачи исполнителю. Когда задача выполняется в качестве первого действия, она должна вызвать MDC.setContextMapValues ​​(), чтобы связать сохраненную копию исходных значений MDC с новым управляемым потоком Executor.

Это было бы хорошо, но добавить эти вызовы очень легко, и нет простого способа распознать проблему, пока не станет слишком поздно. Единственным признаком Log4j является то, что вы пропускаете информацию MDC в журналах, а с помощью Logback вы получаете устаревшую информацию MDC (поскольку поток в пуле протектора наследует свой MDC от первой задачи, которая была на нем запущена). Оба являются серьезными проблемами в производственной системе.

Я не считаю нашу ситуацию особенной в любом случае, но я не мог найти много об этой проблеме в Интернете. Очевидно, что это не то, с чем сталкиваются многие люди, поэтому должен быть способ избежать этого. Что мы здесь делаем не так?

Лоран Пинтер
источник
1
Если ваше приложение развернуто в среде JEE, вы можете использовать перехватчики Java для установки контекста MDC перед вызовом EJB.
Максим Кирилов
2
Начиная с версии 1.1.5, значения MDC больше не наследуются дочерними потоками.
Ceki
jira.qos.ch/browse/LOGBACK-422 решено
lyjackal
2
@Ceki Необходимо обновить документацию: «Дочерний поток автоматически наследует копию сопоставленного диагностического контекста своего родителя». logback.qos.ch/manual/mdc.html
Штеффен
Я создал запрос на извлечение к slf4j, который решает проблему использования MDC в разных потоках (ссылка github.com/qos-ch/slf4j/pull/150 ). Может быть, если люди прокомментируют и попросят об этом, они внесут изменения в SLF4J :)
Мужчина

Ответы:

79

Да, это общая проблема, с которой я столкнулся. Есть несколько обходных путей (например, ручная настройка, как описано), но в идеале вам нужно решение, которое

  • Устанавливает MDC последовательно;
  • Избегает неявных ошибок, когда MDC неверен, но вы этого не знаете; и
  • Минимизирует изменения в том , как использовать пулы потоков (например , подклассы Callableс MyCallableвезде, или подобным уродством).

Вот решение, которое я использую, которое отвечает этим трем потребностям. Код должен быть понятен.

(Как примечание, этот исполнитель может быть создан и передан Guava MoreExecutors.listeningDecorator(), если вы используете Guava ListanableFuture.)

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by jlevy.
 * Date: 6/14/13
 */
public class MdcThreadPoolExecutor extends ThreadPoolExecutor {

    final private boolean useFixedContext;
    final private Map<String, Object> fixedContext;

    /**
     * Pool where task threads take MDC from the submitting thread.
     */
    public static MdcThreadPoolExecutor newWithInheritedMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(null, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    /**
     * Pool where task threads take fixed MDC from the thread that creates the pool.
     */
    @SuppressWarnings("unchecked")
    public static MdcThreadPoolExecutor newWithCurrentMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                          TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(MDC.getCopyOfContextMap(), corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue);
    }

    /**
     * Pool where task threads always have a specified, fixed MDC.
     */
    public static MdcThreadPoolExecutor newWithFixedMdc(Map<String, Object> fixedContext, int corePoolSize,
                                                        int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                                        BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(fixedContext, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    private MdcThreadPoolExecutor(Map<String, Object> fixedContext, int corePoolSize, int maximumPoolSize,
                                  long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.fixedContext = fixedContext;
        useFixedContext = (fixedContext != null);
    }

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return useFixedContext ? fixedContext : MDC.getCopyOfContextMap();
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}
jlevy
источник
Если предыдущий контекст не пуст, разве это не мусор? Почему вы носите это с собой?
Джек
2
Правильно; это не должно быть установлено. Это просто кажется хорошей гигиеной, например, если метод wrap () был выставлен и использован кем-то другим в будущем.
jlevy
Можете ли вы дать ссылку на то, как этот MdcThreadPoolExecutor был присоединен или на который ссылается Log4J2? Есть ли где-то, где нам нужно специально ссылаться на этот класс, или это делается «автоматически»? Я не использую гуаву. Я мог бы, но я хотел бы знать, есть ли какой-то другой способ до его использования.
JCB
Если я правильно понимаю ваш вопрос, ответ - да, это «волшебные» локальные переменные потока в SLF4J - см. Реализации MDC.setContextMap () и т. Д. Также, кстати, здесь используется SLF4J, а не Log4J, что предпочтительнее как это работает с Log4j, Logback и другими настройками журналирования.
июля
1
Просто для полноты: если вы используете Spring ThreadPoolTaskExecutorвместо обычной Java ThreadPoolExecutor, вы можете использовать MdcTaskDecoratorописанное на moelholm.com/2017/07/24/…
Pino
27

Мы столкнулись с подобной проблемой. Возможно, вы захотите расширить ThreadPoolExecutor и переопределить методы before / afterExecute, чтобы сделать необходимые вызовы MDC перед запуском / остановкой новых потоков.

отметка
источник
10
Методы beforeExecute(Thread, Runnable)и afterExecute(Runnable, Throwable)могут быть полезны в других случаях, но я не уверен, как это будет работать для настройки MDC. Они оба выполнены под порожденной нитью. Это означает, что вы должны иметь возможность получить обновленную карту из основного потока раньше beforeExecute.
Кенстон Чой
Лучше установить MDC в фильтре, то есть, когда запрос обрабатывается бизнес-логикой, контекст не будет обновляться. Я не думаю, что мы должны обновлять MDC везде в приложении
dereck
15

ИМХО, лучшим решением является:

  • использование ThreadPoolTaskExecutor
  • реализовать свой собственный TaskDecorator
  • используй это: executor.setTaskDecorator(new LoggingTaskDecorator());

Декоратор может выглядеть так:

private final class LoggingTaskDecorator implements TaskDecorator {

    @Override
    public Runnable decorate(Runnable task) {
        // web thread
        Map<String, String> webThreadContext = MDC.getCopyOfContextMap();
        return () -> {
            // work thread
            try {
                // TODO: is this thread safe?
                MDC.setContextMap(webThreadContext);
                task.run();
            } finally {
                MDC.clear();
            }
        };
    }

}
Томаш Мышик
источник
Извините, не совсем уверен, что вы имеете в виду. ОБНОВЛЕНИЕ: я думаю, что я вижу теперь, улучшит мой ответ.
Томаш Мышик
6

Вот как я делаю это с фиксированными пулами потоков и исполнителями:

ExecutorService executor = Executors.newFixedThreadPool(4);
Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

В поточной части:

executor.submit(() -> {
    MDC.setContextMap(mdcContextMap);
    // my stuff
});
Amaury D
источник
2

Подобно ранее опубликованным решениям, newTaskForметоды Runnableи Callableмогут быть перезаписаны для переноса аргумента (см. Принятое решение) при создании RunnableFuture.

Примечание: Следовательно, executorService«S submitметод должен быть вызван вместо executeметода.

Для ScheduledThreadPoolExecutor, decorateTaskметоды будут перезаписаны вместо этого.

Мой ключ_
источник
2

Если вы столкнулись с этой проблемой в среде, связанной @Asyncс Spring Framework, где вы запускаете задачи с помощью аннотации, вы можете украсить задачи с помощью подхода TaskDecorator . Пример того, как это сделать, представлен здесь: https://moelholm.com/blog/2017/07/24/spring-43-using-a-taskdecorator-to-copy-mdc-data-to-async-threads

Я столкнулся с этой проблемой, и статья выше помогла мне решить ее, поэтому я делюсь ею здесь.

Soner
источник
0

Другой вариант, аналогичный существующим здесь ответам, заключается в реализации ExecutorServiceи разрешении передачи ему делегата. Затем, используя дженерики, он все еще может выставлять фактического делегата в случае, если кто-то хочет получить некоторую статистику (пока никакие другие методы модификации не используются).

Код ссылки:

public class MDCExecutorService<D extends ExecutorService> implements ExecutorService {

    private final D delegate;

    public MDCExecutorService(D delegate) {
        this.delegate = delegate;
    }

    @Override
    public void shutdown() {
        delegate.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        return delegate.shutdownNow();
    }

    @Override
    public boolean isShutdown() {
        return delegate.isShutdown();
    }

    @Override
    public boolean isTerminated() {
        return delegate.isTerminated();
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.awaitTermination(timeout, unit);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return delegate.submit(wrap(task));
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return delegate.submit(wrap(task), result);
    }

    @Override
    public Future<?> submit(Runnable task) {
        return delegate.submit(wrap(task));
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks));
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks), timeout, unit);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return delegate.invokeAny(wrapCollection(tasks));
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return delegate.invokeAny(wrapCollection(tasks), timeout, unit);
    }

    @Override
    public void execute(Runnable command) {
        delegate.execute(wrap(command));
    }

    public D getDelegate() {
        return delegate;
    }

    /* Copied from https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common
    /concurrent/MDCWrappers.java */

    private static Runnable wrap(final Runnable runnable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Callable<T> wrap(final Callable<T> callable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Consumer<T> wrap(final Consumer<T> consumer) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return (t) -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                consumer.accept(t);
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Collection<Callable<T>> wrapCollection(Collection<? extends Callable<T>> tasks) {
        Collection<Callable<T>> wrapped = new ArrayList<>();
        for (Callable<T> task : tasks) {
            wrapped.add(wrap(task));
        }
        return wrapped;
    }
}
Кенстон Чой
источник
-3

Я смог решить это, используя следующий подход

В основном потоке (Application.java, точка входа моего приложения)

static public Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

В методе run класса, который вызывается Executer

MDC.setContextMap(Application.mdcContextMap);
smishra
источник