Служба Java Stop Executor, если одна из назначенных ему задач по какой-либо причине не выполняется

12

Мне нужен какой-то сервис, который будет запускать несколько задач одновременно и с интервалом в 1 секунду в течение 1 минуты.

Если одна из задач не выполняется, я хочу остановить службу и все задачи, которые выполнялись вместе с каким-то индикатором, что что-то пошло не так, иначе, если через одну минуту все прошло хорошо, служба остановится с индикатором, что все прошло хорошо.

Например, у меня есть 2 функции:

Runnable task1 = ()->{
      int num = Math.rand(1,100);
      if (num < 5){
          throw new Exception("something went wrong with this task,terminate");
      }
}

Runnable task2 = ()->{
      int num = Math.rand(1,100)
      return num < 50;
}



ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
task1schedule = scheduledExecutorService.scheduleAtFixedRate(task1, 1, 60, TimeUnit.SECONDS);
task2schedule = scheduledExecutorService.scheduleAtFixedRate(task2, 1, 60, TimeUnit.SECONDS);

if (!task1schedule || !task2schedule) scheduledExecutorService.shutdown();

Любые идеи о том, как я должен заняться этим и сделать вещи как можно более общими?

totothegreat
источник
1
Мало что, кроме реального вопроса, Math.randне является встроенным API. Реализация Runnableдолжна иметь void runопределение. Тип task1/2scheduleбудет ScheduledFuture<?>в предоставленном контексте. Переходя к актуальному вопросу, как это использовать awaitTermination? Вы могли бы сделать это как scheduledExecutorService.awaitTermination(1,TimeUnit.MINUTES);. Кроме того , что о проверке , если любой из задач , был отменен до его нормального завершения: if (task1schedule.isCancelled() || task2schedule.isCancelled()) scheduledExecutorService.shutdown();?
Наман
2
Нет смысла планировать задачи, которые будут повторяться каждую минуту, но, скажем, вы хотите остановить задачи, «если через одну минуту все прошло хорошо». Поскольку вы останавливаете исполнителя в любом случае, планирование задачи, которая завершает работу исполнителя через одну минуту, является тривиальным. А фьючерсы уже указывают на то, что что-то пошло не так или нет. Вы не сказали, какой другой тип индикатора вы хотите.
Хольгер

Ответы:

8

Идея состоит в том, что задачи проталкиваются к общему объекту TaskCompleteEvent. Если они выдвигают ошибку, планировщик останавливается, и все задачи останавливаются.

Вы можете проверить результаты каждой итерации задачи в картах «ошибки» и «успех».

public class SchedulerTest {

    @Test
    public void scheduler() throws InterruptedException {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
        TaskCompleteEvent taskCompleteEvent = new TaskCompleteEvent(scheduledExecutorService);
        Runnable task1 = () -> {
            int num = new Random().nextInt(100);
            if (num < 5) {
                taskCompleteEvent.message("task1-"+UUID.randomUUID().toString(), "Num "+num+" was obatined. Breaking all the executions.", true);
            }
        };
        Runnable task2 = () -> {
            int num = new Random().nextInt(100);
            taskCompleteEvent.message("task2-"+UUID.randomUUID().toString(), num < 50, false);
        };
        scheduledExecutorService.scheduleAtFixedRate(task1, 0, 1, TimeUnit.SECONDS);
        scheduledExecutorService.scheduleAtFixedRate(task2, 0, 1, TimeUnit.SECONDS);
        scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);
        System.out.println("Success: "+taskCompleteEvent.getSuccess());
        System.out.println("Errors: "+taskCompleteEvent.getErrors());
        System.out.println("Went well?: "+taskCompleteEvent.getErrors().isEmpty());
    }

    public static class TaskCompleteEvent {

        private final ScheduledExecutorService scheduledExecutorService;
        private final Map<String, Object> errors = new LinkedHashMap<>();
        private final Map<String, Object> success = new LinkedHashMap<>();

        public TaskCompleteEvent(ScheduledExecutorService scheduledExecutorService) {
            this.scheduledExecutorService = scheduledExecutorService;
        }

        public synchronized void message(String id, Object response, boolean error) {
            if (error) {
                errors.put(id, response);
                scheduledExecutorService.shutdown();
            } else {
                success.put(id, response);
            }
        }

        public synchronized Map<String, Object> getErrors() {
            return errors;
        }

        public synchronized Map<String, Object> getSuccess() {
            return success;
        }

    }

}
ravenskater
источник
2

Вам просто нужно добавить дополнительную задачу, задачей которой является наблюдение за всеми остальными запущенными задачами - и, когда любая из отслеживаемых задач дает сбой, им необходимо установить семафор (флаг), который ассасин может проверять.

    ScheduledExecutorService executor = (ScheduledExecutorService) Executors.newScheduledThreadPool(2);

    // INSTANTIATE THE REMOTE-FILE-MONITOR:
    RemoteFileMonitor monitor = new RemoteFileMonitor(remotesource, localtarget);

    // THIS TimerTask PERIODICALLY TRIGGERS THE RemoteFileMonitor: 
    TimerTask remote = new TimerTask() {

        // RUN FORREST... RUN !
        public void run() {

            try { 

                kae.trace("TimerTask::run() --> Calling RemoteFileMonitor.check()");
                monitor.check();

            } catch (Exception ex) {

                // NULL TRAP: ALLOWS US TO CONTINUE AND RETRY:

            }

        }

    };

    // THIS TimerTask PERIODICALLY TRIES TO KILL THE REMOTE-FILE-MONITOR:
    TimerTask assassin = new TimerTask() {

        // WHERE DO BAD FOLKS GO WHEN THEY DIE ? 
        private final LocalDateTime death = LocalDateTime.now().plus(ConfigurationOptions.getPollingCycleTime(), ChronoUnit.MINUTES);

        // RUN FORREST... RUN !
        public void run() {

            // IS THERE LIFE AFTER DEATH ???
            if (LocalDateTime.now().isAfter(death)) {

                // THEY GO TO A LAKE OF FIRE AND FRY:
                kae.error(ReturnCode.MONITOR_POLLING_CYCLE_EXCEEDED);                   

            }

        }

    };

    // SCHEDULE THE PERIODIC EXECUTION OF THE RemoteFileMonitor: (remote --> run() monitor --> check())
    executor.scheduleAtFixedRate(remote, delay, interval, TimeUnit.MINUTES);

    // SCHEDULE PERIODIC ASSASSINATION ATTEMPTS AGAINST THE RemoteFileMonitor: (assassin --> run() --> after death --> die())
    executor.scheduleAtFixedRate(assassin, delay, 60L, TimeUnit.SECONDS);

    // LOOP UNTIL THE MONITOR COMPLETES:
    do {

        try {

            // I THINK I NEED A NAP:
            Thread.sleep(interval * 10);                

        } catch (InterruptedException e) {

            // FAIL && THEN cleanexit();
            kae.error(ReturnCode.MONITORING_ERROR, "Monitoring of the XXXXXX-Ingestion site was interrupted");

        }

        // NOTE: THE MONITOR IS SET TO 'FINISHED' WHEN THE DONE-File IS DELIVERED AND RETRIEVED:
    } while (monitor.isNotFinished());

    // SHUTDOWN THE MONITOR TASK:
    executor.shutdown();
Грег Патнуде
источник
2
Класс TimerTaskсовершенно не связан с ScheduledExecutorService; это просто случилось, чтобы реализовать Runnable. Кроме того, нет смысла планировать периодическую задачу, просто чтобы проверить, было ли достигнуто определенное время ( ConfigurationOptions.getPollingCycleTime()). У вас есть ScheduledExecutorService, так что вы можете сказать ему, чтобы запланировать задачу прямо на желаемое время.
Хольгер
Реализация в примере, который я использовал, состояла в том, чтобы убить выполняющуюся задачу через определенный промежуток времени, если задача еще не завершена. Вариант использования: если удаленный сервер не удалил файл в течение 2 часов - убейте задачу. это то, что попросил ОП.
Грег Патнуде
Вы прочитали и поняли мой комментарий? Неважно, что делает код, он использует беспричинный класс без причины, просто замените TimerTaskна, Runnableи вы исправили проблему, не меняя того, что делает код. Кроме того, просто используйте, executor.schedule(assassin, ConfigurationOptions.getPollingCycleTime(), ChronoUnit.MINUTES);и он будет запущен один раз в нужное время, следовательно, if(LocalDateTime.now().isAfter(death))проверка устарела. Опять же, это не меняет того, что делает код, кроме того, что делает его существенно проще и эффективнее.
Хольгер