Обработка исключений из задач Java ExecutorService

213

Я пытаюсь использовать ThreadPoolExecutorкласс Java для запуска большого количества тяжеловесных задач с фиксированным количеством потоков. У каждой из задач есть много мест, в которых она может потерпеть неудачу из-за исключений.

Я разделил на подклассы ThreadPoolExecutorи переопределил afterExecuteметод, который должен обеспечивать любые неперехваченные исключения, возникающие при выполнении задачи. Тем не менее, я не могу заставить его работать.

Например:

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

Вывод этой программы: «Все хорошо - ситуация нормальная!» даже если единственный Runnable, представленный в пул потоков, генерирует исключение. Любой ключ к тому, что здесь происходит?

Спасибо!

Том
источник
Вы никогда не спрашивали о будущем задачи, что там произошло. Весь исполнитель службы или программа не будет аварийно завершена. Исключение перехватывается и переносится в ExecutionException. И будет ли он переброшен, если вы вызовете future.get (). PS: future.isDone () [Пожалуйста, прочитайте настоящее имя API] вернет true, даже когда runnable закончил по ошибке. Потому что задача выполнена по-настоящему.
Джай Пандит

Ответы:

156

Из документов :

Примечание. Когда действия включены в задачи (например, FutureTask) либо явно, либо с помощью таких методов, как submit, эти объекты задач перехватывают и поддерживают вычислительные исключения, поэтому они не вызывают внезапного завершения, а внутренние исключения не передаются этому методу. ,

Когда вы отправите Runnable, он будет помещен в будущее.

Ваш afterExecute должен быть примерно таким:

public final class ExtendedExecutor extends ThreadPoolExecutor {

    // ...

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            System.out.println(t);
        }
    }
}
н.у.к.
источник
7
Спасибо, я в конечном итоге использовал это решение. Кроме того, на случай, если кому-то будет интересно: другие предложили не создавать подклассы ExecutorService, но я все равно сделал это, потому что хотел отслеживать задачи по мере их выполнения, а не ждать завершения всех из них, а затем вызывать get () для всех возвращенных Futures. ,
Том
1
Другой подход к созданию подкласса исполнителя - это создание подкласса FutureTask и переопределение его метода done.
nos
1
Tom >> Можете ли вы опубликовать пример кода, где вы подкласс ExecutorService для мониторинга задач по мере их выполнения ...
создали
1
Этот ответ не будет работать, если вы используете ComplableFuture.runAsync, так как afterExecute будет содержать объект, который является частным пакетом, и не имеет доступа к объекту throwable. Я обошел это, завернув вызов. Смотрите мой ответ ниже.
ммм
2
Должны ли мы проверить, завершено ли будущее с помощью future.isDone()? Поскольку afterExecuteвыполняется после Runnableзавершения, я предполагаю, что future.isDone()всегда возвращается true.
Searene
247

ВНИМАНИЕ : Следует отметить, что это решение заблокирует вызывающий поток.


Если вы хотите обрабатывать исключения, сгенерированные задачей, то лучше использовать Callable, чем Runnable.

Callable.call() разрешено выдавать проверенные исключения, и они передаются обратно вызывающему потоку:

Callable task = ...
Future future = executor.submit(task);
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}

Если Callable.call()выбрасывает исключение, это будет завернуто в ExecutionExceptionи брошено Future.get().

Это, вероятно, будет намного предпочтительнее, чем создание подклассов ThreadPoolExecutor. Это также дает вам возможность повторно отправить задачу, если исключение является восстанавливаемым.

skaffman
источник
5
> Callable.call () разрешено генерировать проверенные исключения, и они передаются обратно вызывающему потоку: обратите внимание, что выброшенное исключение будет распространяться на вызывающий поток, только если future.get()вызвана его перегруженная версия.
nhylated
16
Это прекрасно, но что делать, если я запускаю задачи параллельно и не хочу блокировать выполнение?
Григорий Кислин
44
Не используйте это решение, поскольку оно нарушает всю цель использования ExecutorService. ExecutorService - это механизм асинхронного выполнения, способный выполнять задачи в фоновом режиме. Если вы вызываете future.get () сразу после выполнения, он будет блокировать вызывающий поток, пока задача не будет завершена.
user1801374
2
Это решение не должно быть так высоко оценено. Future.get () работает синхронно и будет действовать как блокировщик до тех пор, пока Runnable или Callable не будут выполнены, и, как указано выше, не
Super Hans
2
Как отметил #nhylated, это заслуживает jdk BUG. Если Future.get () не вызывается, любое необработанное исключение из Callable молча игнорируется. Очень плохой дизайн .... просто потратил 1 день, чтобы выяснить, какая библиотека использовала это, и jdk молча игнорировал исключения. И это все еще существует в jdk12.
Бен Цзян
18

Объяснение этого поведения прямо в javadoc для afterExecute :

Примечание. Когда действия включены в задачи (например, FutureTask) либо явно, либо с помощью таких методов, как submit, эти объекты задач перехватывают и поддерживают вычислительные исключения, поэтому они не вызывают внезапного завершения, а внутренние исключения не передаются этому методу. ,

Дрю Уиллс
источник
10

Я обошел его, обернув прилагаемый runnable, представленный исполнителю.

CompletableFuture.runAsync(() -> {
        try {
              runnable.run();
        } catch (Throwable e) {
              Log.info(Concurrency.class, "runAsync", e);
        }
}, executorService);
ммм
источник
3
Вы можете улучшить читаемость, используя whenComplete()метод CompletableFuture.
Эдуард Вирч,
@EduardWirch это работает, но вы не можете выбросить исключение из whenComplete ()
Акшат
7

Я использую VerboseRunnableкласс из jcabi-log , который проглатывает все исключения и регистрирует их. Очень удобно, например:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() {
      public void run() { 
        // the code, which may throw
      }
    },
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);
yegor256
источник
3

Другим решением будет использование ManagedTask и ManagedTaskListener .

Вам нужен Callable или Runnable, который реализует интерфейс ManagedTask .

Метод getManagedTaskListenerвозвращает нужный вам экземпляр.

public ManagedTaskListener getManagedTaskListener() {

И вы реализуете в ManagedTaskListener по taskDoneметоду:

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
    if (exception != null) {
        LOGGER.log(Level.SEVERE, exception.getMessage());
    }
}

Подробнее о жизненном цикле управляемой задачи и слушателе .

CSchulz
источник
2

Это работает

  • Он получен из SingleThreadExecutor, но вы можете легко адаптировать его
  • Java 8 код lamdas, но легко исправить

Это создаст Исполнителя с одним потоком, который может получить много задач; и будет ждать, пока текущий завершит выполнение, чтобы начать со следующего

В случае ошибки или исключения uncaughtExceptionHandler его поймает

открытый финальный класс SingleThreadExecutorWithExceptions {

    public static ExecutorService newSingleThreadExecutorWithExceptions (final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {

        ThreadFactory factory = (Runnable runnable) -> {
            окончательный поток newThread = новый поток (runnable, "SingleThreadExecutorWithExceptions");
            newThread.setUncaughtExceptionHandler ((конечный поток, caugthThread, последний Throwable throwable) -> {
                uncaughtExceptionHandler.uncaughtException (caugthThread, throwable);
            });
            вернуть newThread;
        };
        вернуть новый FinalizableDelegatedExecutorService
                (новый ThreadPoolExecutor (1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        новый LinkedBlockingQueue (),
                        завод) {


                    Защищенная пустота afterExecute (Runnable, Runnable, Throwable throwable) {
                        super.afterExecute (работоспособный, бросаемый);
                        if (throwable == null && runnable instanceof Future) {
                            пытаться {
                                Будущее будущее = (Будущее) работоспособный;
                                if (future.isDone ()) {
                                    future.get ();
                                }
                            } catch (CancellationException ce) {
                                бросаемый = ce;
                            } catch (ExecutionException ee) {
                                throwable = ee.getCause ();
                            } catch (InterruptedException ie) {
                                Thread.currentThread () прерывание (). // игнорируем / сбрасываем
                            }
                        }
                        if (throwable! = null) {
                            uncaughtExceptionHandler.uncaughtException (Thread.currentThread (), Throwable);
                        }
                    }
                });
    }



    закрытый статический класс FinalizableDelegatedExecutorService
            extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService (ExecutorService executor) {
            супер (исполнитель);
        }
        финал защищенный void () {
            super.shutdown ();
        }
    }

    / **
     * Класс-оболочка, который предоставляет только методы ExecutorService
     * реализации ExecutorService.
     * /
    закрытый статический класс DelegatedExecutorService extends AbstractExecutorService {
        закрытый финал ExecutorService e;
        DelegatedExecutorService (ExecutorService executor) {e = executor; }
        public void execute (команда Runnable) {e.execute (команда); }
        public void shutdown () {e.shutdown (); }
        открытый список shutdownNow () {return e.shutdownNow (); }
        public boolean isShutdown () {return e.isShutdown (); }
        public boolean isTermination () {return e.isTeridity (); }
        public boolean awaitTermination (длительный тайм-аут, блок TimeUnit)
                бросает InterruptedException {
            return e.awaitTermination (timeout, unit);
        }
        public Future submit (Выполняемая задача) {
            возврат e.submit (задача);
        }
        публикация в будущем (вызываемая задача) {
            возврат e.submit (задача);
        }
        public Future submit (Выполняемая задача, T результат) {
            вернуть e.submit (задача, результат);
        }
        общедоступный список> invokeAll (коллекция> задачи)
                бросает InterruptedException {
            return e.invokeAll (задачи);
        }
        общедоступный список> invokeAll (коллекция> задач,
                                             длительный таймаут, блок TimeUnit)
                бросает InterruptedException {
            return e.invokeAll (задачи, время ожидания, единица измерения);
        }
        public T invokeAny (Коллекция> задачи)
                выдает InterruptedException, ExecutionException {
            return e.invokeAny (задачи);
        }
        public T invokeAny (Коллекция> задач,
                               длительный таймаут, блок TimeUnit)
                выдает InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny (задачи, время ожидания, единица измерения);
        }
    }



    private SingleThreadExecutorWithExceptions () {}
}
obesga_tirant
источник
Использование finalize, к сожалению, немного нестабильно, поскольку оно будет вызываться «позже, когда сборщик мусора его соберет» (или, возможно, не в случае с Thread, не знаю) ...
заберет
1

Если вы хотите отслеживать выполнение задачи, вы можете вращать 1 или 2 потока (может быть, больше в зависимости от нагрузки) и использовать их для получения задач из оболочки ExecutionCompletionService.

Кристиан Ботиза
источник
0

Если вы ExecutorServiceпришли из внешнего источника (то есть невозможно подкласс ThreadPoolExecutorи переопределить afterExecute()), вы можете использовать динамический прокси для достижения желаемого поведения:

public static ExecutorService errorAware(final ExecutorService executor) {
    return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
            new Class[] {ExecutorService.class},
            (proxy, method, args) -> {
                if (method.getName().equals("submit")) {
                    final Object arg0 = args[0];
                    if (arg0 instanceof Runnable) {
                        args[0] = new Runnable() {
                            @Override
                            public void run() {
                                final Runnable task = (Runnable) arg0;
                                try {
                                    task.run();
                                    if (task instanceof Future<?>) {
                                        final Future<?> future = (Future<?>) task;

                                        if (future.isDone()) {
                                            try {
                                                future.get();
                                            } catch (final CancellationException ce) {
                                                // Your error-handling code here
                                                ce.printStackTrace();
                                            } catch (final ExecutionException ee) {
                                                // Your error-handling code here
                                                ee.getCause().printStackTrace();
                                            } catch (final InterruptedException ie) {
                                                Thread.currentThread().interrupt();
                                            }
                                        }
                                    }
                                } catch (final RuntimeException re) {
                                    // Your error-handling code here
                                    re.printStackTrace();
                                    throw re;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    } else if (arg0 instanceof Callable<?>) {
                        args[0] = new Callable<Object>() {
                            @Override
                            public Object call() throws Exception {
                                final Callable<?> task = (Callable<?>) arg0;
                                try {
                                    return task.call();
                                } catch (final Exception e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    }
                }
                return method.invoke(executor, args);
            });
}
бас
источник
0

Это происходит из - за AbstractExecutorService :: submitоборачивает свой runnableINTO RunnableFuture(ничего , кроме FutureTask) , как показано ниже

AbstractExecutorService.java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
    execute(ftask);
    return ftask;
}

Затем executeпередам его Workerи Worker.run()позвоним ниже.

ThreadPoolExecutor.java

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();           /////////HERE////////
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

Наконец, task.run();в приведенном выше коде вызов будет позвонить FutureTask.run(). Вот код обработчика исключений, поэтому вы НЕ получаете ожидаемое исключение.

class FutureTask<V> implements RunnableFuture<V>

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {   /////////HERE////////
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
Канагавелу Сугамар
источник
0

Это похоже на решение МММ, но немного более понятно. Пусть ваши задачи расширяют абстрактный класс, заключающий в себе метод run ().

public abstract Task implements Runnable {

    public abstract void execute();

    public void run() {
      try {
        execute();
      } catch (Throwable t) {
        // handle it  
      }
    }
}


public MySampleTask extends Task {
    public void execute() {
        // heavy, error-prone code here
    }
}
ccleve
источник
-4

Вместо того чтобы создавать подклассы ThreadPoolExecutor, я бы предоставил ему экземпляр ThreadFactory, который создает новые потоки и предоставляет им UncaughtExceptionHandler.

Kevin
источник
3
Я тоже это попробовал, но метод uncaughtException, кажется, никогда не вызывается. Я считаю, что это потому, что рабочий поток в классе ThreadPoolExecutor перехватывает исключения.
Том
5
Метод uncaughtException не вызывается, потому что метод submit ExecutorService оборачивает Callable / Runnable в будущем; исключение фиксируется там.
Эмиль Сит