Я пишу приложение, которое имеет 5 потоков, которые одновременно получают некоторую информацию из Интернета и заполняют 5 разных полей в классе буфера.
Мне нужно проверить данные буфера и сохранить их в базе данных, когда все потоки завершат свою работу.
Как я могу это сделать (получать предупреждения, когда все потоки завершили свою работу)?
94
Ответы:
Подход, который я применяю, заключается в использовании ExecutorService для управления пулами потоков.
ExecutorService es = Executors.newCachedThreadPool(); for(int i=0;i<5;i++) es.execute(new Runnable() { /* your task */ }); es.shutdown(); boolean finished = es.awaitTermination(1, TimeUnit.MINUTES); // all tasks have finished or the time has been reached.
источник
while(!es.awaitTermination(1, TimeUnit.MINUTES));
es.shutdown();
? что, если я напишу код, в котором я выполнил поток, используяes.execute(runnableObj_ZipMaking);
вtry
блоке, и в которомfinally
я вызвалboolean finshed = es.awaitTermination(10, TimeUnit.MINUTES);
. Итак, я полагаю, это должно подождать, пока все потоки не завершат свою работу или не истечет время ожидания (что бы ни было первым). Верно ли мое предположение? или звонокshutdown()
обязательно?Можно
join
в темы. Соединение блокируется до завершения потока.for (Thread thread : threads) { thread.join(); }
Обратите внимание, что
join
бросаетInterruptedException
. Вам нужно будет решить, что делать, если это произойдет (например, попробуйте отменить другие потоки, чтобы предотвратить ненужную работу).источник
t.join();
означает, что текущий поток блокируется до тех пор, пока поток неt
завершится. Это не влияет на потокt
.Взгляните на различные решения.
join()
API был представлен в ранних версиях Java. Несколько хороших альтернатив доступны с этим параллельным пакетом, начиная с выпуска JDK 1.5.ExecutorService # invokeAll ()
Обратитесь к этому связанному вопросу SE для примера кода:
Как использовать invokeAll (), чтобы весь пул потоков выполнял свою задачу?
CountDownLatch
Обратитесь к этому вопросу для использования
CountDownLatch
Как дождаться потока, который порождает свой собственный поток?
ForkJoinPool или newWorkStealingPool () в исполнителях
Перебрать все объекты Future, созданные после отправки в
ExecutorService
источник
Помимо
Thread.join()
предложенных другими, java 5 представила среду исполнителя. Там сThread
объектами не работаешь . Вместо этого вы отправляете свои объектыCallable
илиRunnable
объекты исполнителю. Есть специальный исполнитель, который предназначен для выполнения нескольких задач и возврата их результатов не по порядку. ЭтоExecutorCompletionService
:ExecutorCompletionService executor; for (..) { executor.submit(Executors.callable(yourRunnable)); }
Затем вы можете повторно вызывать
take()
до тех пор, пока не останется большеFuture<?>
объектов для возврата, что означает, что все они будут завершены.Еще одна вещь, которая может быть актуальной, в зависимости от вашего сценария
CyclicBarrier
.источник
executor.submit
возвращаетFuture<?>
. Я бы добавил эти фьючерсы в список, а затем пролистал список, вызываяget
каждое будущее.Executors
, например,Executors.newCachedThreadPool
(или аналогичный)Другая возможность - это
CountDownLatch
объект, который полезен для простых ситуаций: поскольку вы заранее знаете количество потоков, вы инициализируете его соответствующим счетчиком и передаете ссылку на объект каждому потоку.По завершении своей задачи каждый поток вызывает,
CountDownLatch.countDown()
который уменьшает внутренний счетчик. Основной поток после запуска всех остальных должен выполнитьCountDownLatch.await()
вызов блокировки. Он будет выпущен, как только внутренний счетчик достигнет 0.Обратите внимание, что этим предметом
InterruptedException
тоже можно бросить.источник
Подождите / заблокируйте основной поток, пока другие потоки не завершат свою работу.
Как
@Ravindra babu
сказано, этого можно достичь разными способами, но покажу на примерах.java.lang.Thread. join () Начиная с версии 1.0
public static void joiningThreads() throws InterruptedException { Thread t1 = new Thread( new LatchTask(1, null), "T1" ); Thread t2 = new Thread( new LatchTask(7, null), "T2" ); Thread t3 = new Thread( new LatchTask(5, null), "T3" ); Thread t4 = new Thread( new LatchTask(2, null), "T4" ); // Start all the threads t1.start(); t2.start(); t3.start(); t4.start(); // Wait till all threads completes t1.join(); t2.join(); t3.join(); t4.join(); }
java.util.concurrent.CountDownLatch Начиная с версии 1.5
.countDown()
«Уменьшает количество защелок..await()
«Методы ожидания блокируются, пока текущий счетчик не достигнет нуля.Если вы создали,
latchGroupCount = 4
тоcountDown()
следует вызвать 4 раза, чтобы получить счет 0. Таким образом, этоawait()
освободит блокирующие потоки.public static void latchThreads() throws InterruptedException { int latchGroupCount = 4; CountDownLatch latch = new CountDownLatch(latchGroupCount); Thread t1 = new Thread( new LatchTask(1, latch), "T1" ); Thread t2 = new Thread( new LatchTask(7, latch), "T2" ); Thread t3 = new Thread( new LatchTask(5, latch), "T3" ); Thread t4 = new Thread( new LatchTask(2, latch), "T4" ); t1.start(); t2.start(); t3.start(); t4.start(); //latch.countDown(); latch.await(); // block until latchGroupCount is 0. }
Пример кода класса Threaded
LatchTask
. Для тестирования подхода используйтеjoiningThreads();
иlatchThreads();
метод main.class LatchTask extends Thread { CountDownLatch latch; int iterations = 10; public LatchTask(int iterations, CountDownLatch latch) { this.iterations = iterations; this.latch = latch; } @Override public void run() { String threadName = Thread.currentThread().getName(); System.out.println(threadName + " : Started Task..."); for (int i = 0; i < iterations; i++) { System.out.println(threadName + " : " + i); MainThread_Wait_TillWorkerThreadsComplete.sleep(1); } System.out.println(threadName + " : Completed Task"); // countDown() « Decrements the count of the latch group. if(latch != null) latch.countDown(); } }
Например, обратитесь к этому классу Concurrent_ParallelNotifyies .CyclicBarrier barrier = new CyclicBarrier(3); barrier.await();
Фреймворк Executer: мы можем использовать ExecutorService для создания пула потоков и отслеживать ход выполнения асинхронных задач с помощью Future.
submit(Runnable)
,submit(Callable)
которые возвращают Future Object. Используяfuture.get()
функцию, мы можем заблокировать основной поток до тех пор, пока рабочие потоки не завершат свою работу.invokeAll(...)
- возвращает список объектов Future, с помощью которых вы можете получить результаты выполнения каждого Callable.Найдите пример использования интерфейсов Runnable, Callable с платформой Executor.
@Смотрите также
источник
Ты сделаешь
for (Thread t : new Thread[] { th1, th2, th3, th4, th5 }) t.join()
После этого цикла for вы можете быть уверены, что все потоки завершили свою работу.
источник
Сохраните объекты Thread в некоторой коллекции (например, List или Set), затем выполните цикл по коллекции после запуска потоков и вызовите join () для потоков.
источник
Для этой цели вы можете использовать метод Threadf # join .
источник
Хотя это не относится к проблеме OP, если вас интересует синхронизация (точнее, рандеву) с ровно одним потоком, вы можете использовать Exchanger.
В моем случае мне нужно было приостановить родительский поток, пока дочерний поток что-то не сделает, например, завершит свою инициализацию. CountDownLatch также работает хорошо.
источник
Службу исполнителя можно использовать для управления несколькими потоками, включая статус и завершение. См. Http://programmingexamples.wikidot.com/executorservice
источник
попробуйте это, будет работать.
Thread[] threads = new Thread[10]; List<Thread> allThreads = new ArrayList<Thread>(); for(Thread thread : threads){ if(null != thread){ if(thread.isAlive()){ allThreads.add(thread); } } } while(!allThreads.isEmpty()){ Iterator<Thread> ite = allThreads.iterator(); while(ite.hasNext()){ Thread thread = ite.next(); if(!thread.isAlive()){ ite.remove(); } } }
источник
У меня была аналогичная проблема, и в итоге я использовал Java 8 parallelStream.
Это супер просто и удобно. За кулисами он использует пул соединений fork JVM по умолчанию, что означает, что он будет ждать завершения всех потоков, прежде чем продолжить. В моем случае это было отличное решение, потому что это был единственный параллельный поток в моем приложении. Если у вас одновременно работает более одного parallelStream, прочтите ссылку ниже.
Подробнее о параллельных потоках здесь .
источник
Я создал небольшой вспомогательный метод, чтобы дождаться завершения нескольких потоков:
public static void waitForThreadsToFinish(Thread... threads) { try { for (Thread thread : threads) { thread.join(); } } catch (InterruptedException e) { e.printStackTrace(); } }
источник
В существующих ответах сказано, что
join()
каждый поток.Но есть несколько способов получить массив / список потоков:
ThreadGroup
для управления потоками.Следующий код будет использовать этот
ThreadGruop
подход. Сначала он создает группу, затем при создании каждого потока указывает группу в конструкторе, позже можно получить массив потоков черезThreadGroup.enumerate()
Код
SyncBlockLearn.java
import org.testng.Assert; import org.testng.annotations.Test; /** * synchronized block - learn, * * @author eric * @date Apr 20, 2015 1:37:11 PM */ public class SyncBlockLearn { private static final int TD_COUNT = 5; // thread count private static final int ROUND_PER_THREAD = 100; // round for each thread, private static final long INC_DELAY = 10; // delay of each increase, // sync block test, @Test public void syncBlockTest() throws InterruptedException { Counter ct = new Counter(); ThreadGroup tg = new ThreadGroup("runner"); for (int i = 0; i < TD_COUNT; i++) { new Thread(tg, ct, "t-" + i).start(); } Thread[] tArr = new Thread[TD_COUNT]; tg.enumerate(tArr); // get threads, // wait all runner to finish, for (Thread t : tArr) { t.join(); } System.out.printf("\nfinal count: %d\n", ct.getCount()); Assert.assertEquals(ct.getCount(), TD_COUNT * ROUND_PER_THREAD); } static class Counter implements Runnable { private final Object lkOn = new Object(); // the object to lock on, private int count = 0; @Override public void run() { System.out.printf("[%s] begin\n", Thread.currentThread().getName()); for (int i = 0; i < ROUND_PER_THREAD; i++) { synchronized (lkOn) { System.out.printf("[%s] [%d] inc to: %d\n", Thread.currentThread().getName(), i, ++count); } try { Thread.sleep(INC_DELAY); // wait a while, } catch (InterruptedException e) { e.printStackTrace(); } } System.out.printf("[%s] end\n", Thread.currentThread().getName()); } public int getCount() { return count; } } }
Основной поток будет ждать завершения всех потоков в группе.
источник
Используйте это в своем основном потоке: while (! Executor.isTerminated ()); Поместите эту строку кода после запуска всех потоков из службы исполнителя. Это запустит основной поток только после того, как все потоки, запущенные исполнителями, будут завершены. Обязательно вызовите executeor.shutdown (); перед указанным выше циклом.
источник