Как ждать завершения всех потоков, используя ExecutorService?

381

Мне нужно выполнить некоторое количество задач 4 за один раз, что-то вроде этого:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

Как я могу получить уведомление, когда все они будут завершены? Пока я не могу думать о чем-то лучше, чем установить какой-либо глобальный счетчик задач и уменьшить его в конце каждой задачи, а затем отслеживать в бесконечном цикле этот счетчик, чтобы он стал 0; или получить список фьючерсов и в бесконечном цикле монитора isDone для всех из них. Каковы лучшие решения без бесконечных циклов?

Спасибо.

Serg
источник

Ответы:

446

В основном ExecutorServiceвы звоните, shutdown()а затем awaitTermination():

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
  taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  ...
}
Клетус
источник
9
это именно то, для чего предназначены shutdown / awaitTermination
Matt B
31
Это хорошая модель, если эта задача является одноразовым событием. Однако, если это делается несколько раз в течение одной и той же среды выполнения, это не оптимально, поскольку вы будете создавать и прерывать потоки несколько раз при каждом его выполнении.
sjlee
44
Я ищу любую официальную документацию, которая Long.MAX_VALUE, TimeUnit.NANOSECONDSэквивалентна отсутствию тайм-аута.
Сэм Харвелл
15
Я не могу поверить, что вы должны использовать shutdown для того, чтобы присоединиться ко всем текущим потокам (после завершения shutdown вы не сможете снова использовать executor). Предложите вместо этого использовать список Future's ...
rogerdpack
20
@SamHarwell см. Документацию к java.util.concurrentпакету в разделе: Чтобы ждать «навсегда», вы можете использовать значениеTimingLong.MAX_VALUE
белухин
174

Используйте CountDownLatch :

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

и в рамках вашей задачи (приложите в try / finally)

latch.countDown();
ChssPly76
источник
3
Там нет 4 задач. Есть "некоторое количество задач", выполненных 4 за один раз.
Клет
2
Извините, я неправильно понял вопрос. Да, количество задач должно быть аргументом для конструктора
CountDownLatch
3
Я нахожу это решение более элегантным, чем другие, похоже, что оно было сделано для этой цели, и оно простое и понятное.
wvdschel
3
Что делать, если вы не знаете количество задач перед началом?
Клет
11
@cletus - тогда вы не используете CountDownLatch :-) Имейте в виду, я не утверждаю, что этот подход лучше, чем ваш. Тем не менее, я обнаружил , что в реальных жизненных сценариев , я действительно знаю , количество задач, настройки пула потоков действительно нужно быть настраиваемым в развертывании и бассейны могут быть использованы повторно. Поэтому я обычно использую пулы потоков, внедряемые Spring, и устанавливаю их в качестве прототипов, а закрытие их вручную только для того, чтобы дождаться завершения потоков, кажется далеко не идеальным.
ChssPly76
82

ExecutorService.invokeAll() делает это для вас.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);
sjlee
источник
Трудность наступает, если / когда у вас есть начало «4» потоков по одному,
пошагово
@rogerdpack: я все еще учу этих исполнителей и все такое. Но в ответ на то, что вы спрашиваете. Должны ли 4 потока одновременно не быть частью пакетной задачи, которая выполняется с использованием ответа выше?
Мукул Гоэль
3
Этот метод будет работать только в том случае, если вы знаете количество заданий заранее.
Константин
3
Я думаю, что когда futuresони возвращаются, задачи не были выполнены. Они могут завершиться в будущем, и у вас будет ссылка на результат. Вот почему это называется Future. У вас есть метод Future.get () , который будет ожидать завершения задачи, чтобы получить результат.
Алик Эльзин-килака
5
@ AlikElzin-kilaka Цитата из JavaDocs (ссылка в ответе): «Выполняет заданные задачи, возвращая список фьючерсов, содержащих их статус и результаты, когда все завершено. Future.isDone () имеет значение true для каждого элемента возвращаемого списка. "
Халк
47

Вы также можете использовать списки фьючерсов:

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));

затем, когда вы хотите присоединиться ко всем из них, это, по сути, эквивалентно объединению каждого из них (с дополнительным преимуществом в том, что оно вновь вызывает исключения из дочерних потоков в основной):

for(Future f: this.futures) { f.get(); }

По сути, уловка заключается в том, чтобы вызывать .get () для каждого Future по одному, вместо бесконечного зацикливания, вызывая isDone () on (все или каждый). Таким образом, вы гарантированно «продвинетесь» через и через этот блок, как только закончится последний поток. Предостережение заключается в том, что, так как вызов .get () повторно вызывает исключения, если один из потоков умирает, вы могли бы подняться из этого, возможно, до того, как другие потоки закончат работу до завершения [чтобы избежать этого, вы могли бы добавить catch ExecutionExceptionвызов get ]. Другое предостережение заключается в том, что он сохраняет ссылку на все потоки, поэтому, если у них есть локальные переменные потока, они не будут собираться до тех пор, пока вы не пройдете этот блок (хотя, возможно, вы сможете обойти это, если это станет проблемой, удалив Будущее вне ArrayList). Если вы хотите знать, какое будущее "заканчивается первым"https://stackoverflow.com/a/31885029/32453

rogerdpack
источник
3
Чтобы узнать, что «заканчивается первым», используйте ExecutorCompletionService.take: stackoverflow.com/a/11872604/199364
ToolmakerSteve
34

В Java8 вы можете сделать это с CompletableFuture :

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();
AdamSkywalker
источник
3
Это очень элегантное решение.
Mattvonb
ExecutorService es = Executors.newFixedThreadPool(4); List< Future<?>> futures = new ArrayList<>(); for(Runnable task : taskList) { futures.add(es.submit(task)); } for(Future<?> future : futures) { try { future.get(); }catch(Exception e){ // do logging and nothing else } }
user2862544
@AdamSkywalker необходим ли awaitTermination () после es.shutdown ()?
Gaurav
@gaurav, когда вы вызываете shutdown, некоторые задачи могут быть еще не завершены. Поэтому awaitTermination будет блокировать вызывающий поток, пока все не будет сделано. Это зависит от того, нужно ли вам ждать результатов в этой теме или нет.
AdamSkywalker
@AdamSkywalker отличный ответ. имеет смысл не вызывать awaitTermination (), если мне не нужно ждать результатов.
Gaurav
26

Просто мои два цента. Чтобы преодолеть требование CountDownLatchзаранее знать количество задач, вы можете сделать это по старинке, используя простую Semaphore.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
    taskExecutor.execute(new MyTask());
    numberOfTasks++;
}

try {
    s.aquire(numberOfTasks);
...

В вашей задаче просто позвоните, s.release()как вы быlatch.countDown();

stryba
источник
Увидев это, я сначала подумал, не будет ли проблем, если некоторые releaseвызовы произойдут до acquireвызова, но после прочтения документации Семафора я вижу, что все в порядке.
ToolmakerSteve
13

Немного опоздал к игре, но ради завершения ...

Вместо того, чтобы «ждать» выполнения всех заданий, вы можете думать по принципу Голливуда «не звони мне, я позвоню тебе» - когда я закончу. Я думаю, что полученный код более элегантный ...

Гуава предлагает несколько интересных инструментов для достижения этой цели.

Пример ::

Обернуть ExecutorService в ListeningExecutorService ::

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

Отправить коллекцию вызовов для исполнения ::

for (Callable<Integer> callable : callables) {
  ListenableFuture<Integer> lf = service.submit(callable);
  // listenableFutures is a collection
  listenableFutures.add(lf)
});

Теперь основная часть:

ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);

Прикрепите обратный вызов к ListenableFuture, который вы можете использовать, чтобы получать уведомления, когда все фьючерсы завершены:

        Futures.addCallback(lf, new FutureCallback<List<Integer>>() {
        @Override
        public void onSuccess(List<Integer> result) {
            log.info("@@ finished processing {} elements", Iterables.size(result));
            // do something with all the results
        }

        @Override
        public void onFailure(Throwable t) {
            log.info("@@ failed because of :: {}", t);
        }
    });

Это также дает то преимущество, что вы можете собрать все результаты в одном месте после завершения обработки ...

Больше информации здесь

Разван Петруеску
источник
2
Очень чистый. Работает без нареканий даже на Android. Просто пришлось использовать runOnUiThread()в onSuccess().
Дшмидт
12

Класс CyclicBarrier в Java 5 и более поздних версиях предназначен для такого рода вещей.

Пекка Энберг
источник
7
Круто, никогда не могу вспомнить название этой структуры данных. Однако подходит, только если вы заранее знаете количество задач, которые будут поставлены в очередь.
ᆼ ᆺ ᆼ
да, вы думаете, что сможете преодолеть барьер с помощью текущего потока и всех дочерних потоков, а затем, пройдя его, вы узнаете, что дочерние потоки завершены ...
rogerdpack,
На самом деле это неправильный ответ. CyclicBarrier предназначен для порций. CountDownLatch предназначен для ожидания события
gstackoverflow
7

Следуйте одному из следующих подходов.

  1. Итерация всех будущих задач, возвращенных из submitна ExecutorServiceи проверить статус с блокировкой вызова get()на Futureобъекте в соответствии с предложениемKiran
  2. Использовать invokeAll()на ExecutorService
  3. CountDownLatch
  4. ForkJoinPool или Executors.html # newWorkStealingPool
  5. Используйте shutdown, awaitTermination, shutdownNowAPI ThreadPoolExecutor в правильной последовательности

Связанные вопросы SE:

Как CountDownLatch используется в многопоточности Java?

Как правильно отключить Java ExecutorService

Равиндра Бабу
источник
6

Здесь есть два варианта, просто запутайте, какой из них лучше всего выбрать.

Опция 1:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

Вариант 2:

ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
    futures.add(es.submit(task));
}

for(Future<?> future : futures) {
    try {
        future.get();
    }catch(Exception e){
        // do logging and nothing else
    }
}
es.shutdown();

Здесь положить future.get (); в попытке поймать это хорошая идея, верно?

user2862544
источник
5

Вы можете обернуть ваши задачи в другой runnable, который будет отправлять уведомления:

taskExecutor.execute(new Runnable() {
  public void run() {
    taskStartedNotification();
    new MyTask().run();
    taskFinishedNotification();
  }
});
Zed
источник
1
Мне потребовалось некоторое время, чтобы увидеть, как это решит вопрос ОП. Во-первых, обратите внимание, что эта упаковка относится к каждой задаче, а не к коду, который запускает все задачи. Предположительно, каждый старт будет увеличивать счетчик, а каждый конец будет уменьшать этот счетчик или увеличивать completedсчетчик. Поэтому после их запуска при каждом уведомлении можно было определить, все ли задачи выполнены. Обратите внимание, что крайне важно использовать его try/finallyтак, чтобы готовое уведомление (или альтернативное уведомление в catchблоке) давалось даже в случае сбоя задачи. Иначе бы ждать вечно.
ToolmakerSteve
3

Я только что написал пример программы, которая решает вашу проблему. Краткой реализации не дано, поэтому я добавлю ее. Хотя вы можете использовать executor.shutdown()и executor.awaitTermination(), это не лучшая практика, поскольку время, затрачиваемое разными потоками, было бы непредсказуемым.

ExecutorService es = Executors.newCachedThreadPool();
    List<Callable<Integer>> tasks = new ArrayList<>();

    for (int j = 1; j <= 10; j++) {
        tasks.add(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                int sum = 0;
                System.out.println("Starting Thread "
                        + Thread.currentThread().getId());

                for (int i = 0; i < 1000000; i++) {
                    sum += i;
                }

                System.out.println("Stopping Thread "
                        + Thread.currentThread().getId());
                return sum;
            }

        });
    }

    try {
        List<Future<Integer>> futures = es.invokeAll(tasks);
        int flag = 0;

        for (Future<Integer> f : futures) {
            Integer res = f.get();
            System.out.println("Sum: " + res);
            if (!f.isDone()) 
                flag = 1;
        }

        if (flag == 0)
            System.out.println("SUCCESS");
        else
            System.out.println("FAILED");

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
Киран
источник
Хорошо, что вы показываете использование future.get - хорошая альтернатива, о которой нужно знать. Но почему вы считаете, что лучше ждать вечно , чем устанавливать максимально допустимое время ожидания? Что еще более важно, нет никакой причины делать всю эту логику, когда можно просто дать действительно, очень долгое время для ожидающего завершения, если вы хотите подождать (по существу, навсегда), пока все задачи не будут выполнены.
ToolmakerSteve
Это ничем не отличается от уже представленных здесь решений. Ваше справедливое решение такое же, как представлено @sjlee
pulp_fiction
Не уверен, почему вам нужно проверять выполненное, если согласно oracle doc, invokeAll будет возвращать только «когда все завершено или истечет время ожидания, в зависимости от того, что наступит раньше»
Mashrur
3

Просто, чтобы предоставить больше альтернатив здесь, чтобы использовать защелки / барьеры. Вы также можете получить частичные результаты, пока все они не закончат использование CompletionService .

Из Java Concurrency на практике: «Если у вас есть пакет вычислений для отправки Исполнителю, и вы хотите получать их результаты по мере их появления, вы можете сохранить Future, связанное с каждой задачей, и многократно запрашивать завершение, вызывая get с Тайм-аут нулевой. Это возможно, но утомительно . К счастью, есть лучший способ : услуга завершения. "

Здесь реализация

public class TaskSubmiter {
    private final ExecutorService executor;
    TaskSubmiter(ExecutorService executor) { this.executor = executor; }
    void doSomethingLarge(AnySourceClass source) {
        final List<InterestedResult> info = doPartialAsyncProcess(source);
        CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
        for (final InterestedResult interestedResultItem : info)
            completionService.submit(new Callable<PartialResult>() {
                public PartialResult call() {
                    return InterestedResult.doAnOperationToGetPartialResult();
                }
        });

    try {
        for (int t = 0, n = info.size(); t < n; t++) {
            Future<PartialResult> f = completionService.take();
            PartialResult PartialResult = f.get();
            processThisSegment(PartialResult);
            }
        } 
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } 
        catch (ExecutionException e) {
            throw somethinghrowable(e.getCause());
        }
    }
}
Альберто Гуррион
источник
3

Это моё решение, основанное на подсказке AdamSkywalker, и оно работает

package frss.main;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestHilos {

    void procesar() {
        ExecutorService es = Executors.newFixedThreadPool(4);
        List<Runnable> tasks = getTasks();
        CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).join();
        es.shutdown();

        System.out.println("FIN DEL PROCESO DE HILOS");
    }

    private List<Runnable> getTasks() {
        List<Runnable> tasks = new ArrayList<Runnable>();

        Hilo01 task1 = new Hilo01();
        tasks.add(task1);

        Hilo02 task2 = new Hilo02();
        tasks.add(task2);
        return tasks;
    }

    private class Hilo01 extends Thread {

        @Override
        public void run() {
            System.out.println("HILO 1");
        }

    }

    private class Hilo02 extends Thread {

        @Override
        public void run() {
            try {
                sleep(2000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("HILO 2");
        }

    }


    public static void main(String[] args) {
        TestHilos test = new TestHilos();
        test.procesar();
    }
}
frss-soft.com
источник
2

Вы можете использовать этот код:

public class MyTask implements Runnable {

    private CountDownLatch countDownLatch;

    public MyTask(CountDownLatch countDownLatch {
         this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
         try {
             //Do somethings
             //
             this.countDownLatch.countDown();//important
         } catch (InterruptedException ex) {
              Thread.currentThread().interrupt();
         }
     }
}

CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
     taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");
Туан Фам
источник
2

Я создал следующий рабочий пример. Идея состоит в том, чтобы иметь способ обработать пул задач (я использую очередь в качестве примера) со многими потоками (определяемыми программно числом numberOfTasks / threshold) и ожидать завершения всех потоков, чтобы продолжить какую-то другую обработку.

import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/** Testing CountDownLatch and ExecutorService to manage scenario where
 * multiple Threads work together to complete tasks from a single
 * resource provider, so the processing can be faster. */
public class ThreadCountDown {

private CountDownLatch threadsCountdown = null;
private static Queue<Integer> tasks = new PriorityQueue<>();

public static void main(String[] args) {
    // Create a queue with "Tasks"
    int numberOfTasks = 2000;
    while(numberOfTasks-- > 0) {
        tasks.add(numberOfTasks);
    }

    // Initiate Processing of Tasks
    ThreadCountDown main = new ThreadCountDown();
    main.process(tasks);
}

/* Receiving the Tasks to process, and creating multiple Threads
* to process in parallel. */
private void process(Queue<Integer> tasks) {
    int numberOfThreads = getNumberOfThreadsRequired(tasks.size());
    threadsCountdown = new CountDownLatch(numberOfThreads);
    ExecutorService threadExecutor = Executors.newFixedThreadPool(numberOfThreads);

    //Initialize each Thread
    while(numberOfThreads-- > 0) {
        System.out.println("Initializing Thread: "+numberOfThreads);
        threadExecutor.execute(new MyThread("Thread "+numberOfThreads));
    }

    try {
        //Shutdown the Executor, so it cannot receive more Threads.
        threadExecutor.shutdown();
        threadsCountdown.await();
        System.out.println("ALL THREADS COMPLETED!");
        //continue With Some Other Process Here
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

/* Determine the number of Threads to create */
private int getNumberOfThreadsRequired(int size) {
    int threshold = 100;
    int threads = size / threshold;
    if( size > (threads*threshold) ){
        threads++;
    }
    return threads;
}

/* Task Provider. All Threads will get their task from here */
private synchronized static Integer getTask(){
    return tasks.poll();
}

/* The Threads will get Tasks and process them, while still available.
* When no more tasks available, the thread will complete and reduce the threadsCountdown */
private class MyThread implements Runnable {

    private String threadName;

    protected MyThread(String threadName) {
        super();
        this.threadName = threadName;
    }

    @Override
    public void run() {
        Integer task;
        try{
            //Check in the Task pool if anything pending to process
            while( (task = getTask()) != null ){
                processTask(task);
            }
        }catch (Exception ex){
            ex.printStackTrace();
        }finally {
            /*Reduce count when no more tasks to process. Eventually all
            Threads will end-up here, reducing the count to 0, allowing
            the flow to continue after threadsCountdown.await(); */
            threadsCountdown.countDown();
        }
    }

    private void processTask(Integer task){
        try{
            System.out.println(this.threadName+" is Working on Task: "+ task);
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }
}
}

Надеюсь, поможет!

Фернандо Хиль
источник
1

Вы можете использовать свой собственный подкласс ExecutorCompletionService для переноса taskExecutorи свою собственную реализацию BlockingQueue для получения информации о завершении каждой задачи и выполнения любого обратного вызова или другого действия, которое вы пожелаете, когда количество выполненных задач достигает желаемой цели.

Алекс Мартелли
источник
1

Вы должны использовать executorService.shutdown()и executorService.awaitTerminationметод.

Пример следующим образом:

public class ScheduledThreadPoolExample {

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
                0, 1, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(10);
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
    }

}
Роллен Холт
источник
awaitTermination () необходимо после shutdown () /
gaurav
1

Поэтому я публикую свой ответ по связанному вопросу здесь, если кто-то хочет более простой способ сделать это

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
    futures[i++] =  CompletableFuture.runAsync(runner, executor);
}

CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.
Mạnh Quyết Nguyễn
источник
0

Java 8 - Мы можем использовать потоковый API для обработки потока. Пожалуйста, посмотрите фрагмент ниже

final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool

//alternatively to specify parallelism 
new ForkJoinPool(15).submit(
          () -> tasks.stream().parallel().forEach(Runnable::run) 
    ).get();
Влад
источник
2
Привет Влад, добро пожаловать в StackOverflow. Можете ли вы отредактировать свой ответ, чтобы объяснить, как это отвечает на вопрос, и что делает код? Код только ответы не рекомендуется здесь. Спасибо!
Тим Мэлоун
Этот пост говорит о параллелизме. Параллелизм! = Параллелизм
GabrielBB
0

ExecutorService WORKER_THREAD_POOL 
  = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    WORKER_THREAD_POOL.submit(() -> {
        try {
            // doSomething();
            latch.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}

// wait for the latch to be decremented by the two remaining threads
latch.await();

Если doSomething()бросить некоторые другие исключения, latch.countDown()кажется, не будет выполняться, так что мне делать?

Пэнфэй Жан
источник
0

если вы используете несколько потоков ExecutionServices ПОСЛЕДОВАТЕЛЬНО и хотите дождаться завершения КАЖДОГО ИСПОЛНИТЕЛЬНОГО ОБСЛУЖИВАНИЯ. Лучший способ как ниже;

ExecutorService executer1 = Executors.newFixedThreadPool(THREAD_SIZE1);
for (<loop>) {
   executer1.execute(new Runnable() {
            @Override
            public void run() {
                ...
            }
        });
} 
executer1.shutdown();

try{
   executer1.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

   ExecutorService executer2 = Executors.newFixedThreadPool(THREAD_SIZE2);
   for (true) {
      executer2.execute(new Runnable() {
            @Override
            public void run() {
                 ...
            }
        });
   } 
   executer2.shutdown();
} catch (Exception e){
 ...
}
Доктор х
источник
-1

Это может помочь

Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
                try {
                    Log.i(LOG_TAG, "Waiting for executor to terminate...");
                    if (executor.isTerminated())
                        break;
                    if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                        break;
                    }
                } catch (InterruptedException ignored) {}
            }
Амол Десаи
источник
-1

Вы можете вызвать waitTillDone () в этом классе Runner :

Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool

while(...) {
    runner.run(new MyTask()); // here you submit your task
}


runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)


runner.shutdown(); // once you done you can shutdown the runner

Вы можете повторно использовать этот класс и вызывать waitTillDone () столько раз, сколько хотите, прежде чем вызывать shutdown (), плюс ваш код очень прост . Кроме того, вы не должны знать о количестве задач заранее.

Чтобы использовать его, просто добавьте эту compile 'com.github.matejtymes:javafixes:1.3.1'зависимость gradle / maven в ваш проект.

Более подробную информацию можно найти здесь:

https://github.com/MatejTymes/JavaFixes

Матей Таймс
источник
-2

В executor есть метод, getActiveCount()который подсчитывает количество активных потоков.

После охвата потока мы можем проверить, является ли activeCount()значение 0. Если значение равно нулю, это означает, что в данный момент нет активных потоков, что означает, что задача завершена:

while (true) {
    if (executor.getActiveCount() == 0) {
    //ur own piece of code
    break;
    }
}
пользователь
источник
3
Не очень хорошая идея, см. Stackoverflow.com/a/7271685/1166992 и javadoc: «Возвращает приблизительное количество потоков, которые активно выполняют задачи».
Оливье Фошо