Как узнать, закончились ли другие потоки?

126

У меня есть объект с методом StartDownload(), который запускает три потока.

Как мне получить уведомление, когда каждый поток завершил выполнение?

Есть ли способ узнать, завершен ли один (или весь) поток или все еще выполняется?

Рикардо Фелгейрас
источник
1
Взгляните на класс
Fortyrunner

Ответы:

228

Есть несколько способов сделать это:

  1. Используйте Thread.join () в своем основном потоке, чтобы блокировать выполнение каждого потока, или
  2. Проверяйте Thread.isAlive () методом опроса - обычно не рекомендуется - чтобы дождаться завершения каждого потока, или
  3. Неортодоксально, для каждого рассматриваемого потока вызовите setUncaughtExceptionHandler, чтобы вызвать метод в вашем объекте, и запрограммируйте каждый поток, чтобы генерировать неперехваченное исключение при его завершении, или
  4. Используйте блокировки, синхронизаторы или механизмы из java.util.concurrent , или
  5. Более ортодоксально, создайте слушателя в своем основном потоке, а затем запрограммируйте каждый из своих потоков, чтобы сообщить слушателю, что они завершены.

Как реализовать Идею №5? Ну, один из способов - сначала создать интерфейс:

public interface ThreadCompleteListener {
    void notifyOfThreadComplete(final Thread thread);
}

затем создайте следующий класс:

public abstract class NotifyingThread extends Thread {
  private final Set<ThreadCompleteListener> listeners
                   = new CopyOnWriteArraySet<ThreadCompleteListener>();
  public final void addListener(final ThreadCompleteListener listener) {
    listeners.add(listener);
  }
  public final void removeListener(final ThreadCompleteListener listener) {
    listeners.remove(listener);
  }
  private final void notifyListeners() {
    for (ThreadCompleteListener listener : listeners) {
      listener.notifyOfThreadComplete(this);
    }
  }
  @Override
  public final void run() {
    try {
      doRun();
    } finally {
      notifyListeners();
    }
  }
  public abstract void doRun();
}

а затем каждый из ваших потоков будет расширяться NotifyingThreadи вместо реализации run()будет реализован doRun(). Таким образом, когда они будут завершены, они автоматически уведомят всех, ожидающих уведомления.

Наконец, в вашем основном классе - том, который запускает все потоки (или, по крайней мере, объект, ожидающий уведомления) - измените этот класс implement ThreadCompleteListenerи сразу после создания каждого потока добавьте себя в список слушателей:

NotifyingThread thread1 = new OneOfYourThreads();
thread1.addListener(this); // add ourselves as a listener
thread1.start();           // Start the Thread

затем при выходе из каждого потока ваш notifyOfThreadCompleteметод будет вызываться с экземпляром потока, который только что завершился (или потерпел крах).

Обратите внимание, что лучше было бы implements Runnable, чем extends Threadдля, NotifyingThreadпоскольку расширение Thread обычно не рекомендуется в новом коде. Но я кодирую ваш вопрос. Если вы измените NotifyingThreadкласс для реализации, Runnableвам придется изменить часть кода, который управляет потоками, что довольно просто сделать.

Эдди
источник
Здравствуй!! Мне нравится последняя идея. Я реализовать слушателя для этого? Спасибо
Рикардо Фелгейрас
4
но при использовании этого подхода notifiyListeners вызывается внутри run (), поэтому он будет вызываться внутри потока, и дальнейшие вызовы будут выполняться там же, не так ли?
Jordi Puigdellívol
1
@Eddie Jordi спрашивал, можно ли вызвать notifyметод не внутри runметода, а после него.
Tomasz Dzięcielewski
1
На самом деле вопрос: как вы получаете OFF вторичной нити в настоящее время. Я знаю, что все готово, но как мне теперь получить доступ к главному потоку?
Патрик
1
Этот поток безопасен? Похоже, что notifyListeners (и, следовательно, notifyOfThreadComplete) будет вызываться в NotifyingThread, а не в потоке, создавшем сам Listener.
Аарон
13

Решение с использованием CyclicBarrier

public class Downloader {
  private CyclicBarrier barrier;
  private final static int NUMBER_OF_DOWNLOADING_THREADS;

  private DownloadingThread extends Thread {
    private final String url;
    public DownloadingThread(String url) {
      super();
      this.url = url;
    }
    @Override
    public void run() {
      barrier.await(); // label1
      download(url);
      barrier.await(); // label2
    }
  }
  public void startDownload() {
    // plus one for the main thread of execution
    barrier = new CyclicBarrier(NUMBER_OF_DOWNLOADING_THREADS + 1); // label0
    for (int i = 0; i < NUMBER_OF_DOWNLOADING_THREADS; i++) {
      new DownloadingThread("http://www.flickr.com/someUser/pic" + i + ".jpg").start();
    }
    barrier.await(); // label3
    displayMessage("Please wait...");
    barrier.await(); // label4
    displayMessage("Finished");
  }
}

label0 - создается циклический барьер с количеством сторон, равным количеству выполняемых потоков плюс один для основного потока выполнения (в котором выполняется startDownload ())

метка 1 - n-ый поток загрузки входит в зал ожидания

ярлык 3 - NUMBER_OF_DOWNLOADING_THREADS вошли в комнату ожидания. Основной поток выполнения освобождает их, чтобы они начали выполнять свои задания по загрузке более или менее в одно и то же время.

метка 4 - основной поток выполнения входит в комнату ожидания. Это самая «сложная» часть кода для понимания. Неважно, какой поток во второй раз войдет в комнату ожидания. Важно, чтобы любой поток, входящий в комнату последним, гарантировал, что все другие потоки загрузки завершили свои задания загрузки.

метка 2 - n-й DownloadingThread завершил загрузку и перешел в комнату ожидания. Если это последний, то есть уже NUMBER_OF_DOWNLOADING_THREADS вошли в него, включая основной поток выполнения, основной поток продолжит свое выполнение только после того, как все остальные потоки завершат загрузку.

Борис Павлович
источник
9

Вы действительно должны предпочесть решение, которое использует java.util.concurrent. Найдите и прочтите по теме Джоша Блоха и / или Брайана Гетца.

Если вы не используете потоки java.util.concurrent.*и берете на себя ответственность за их использование напрямую, вам, вероятно, следует использовать его, join()чтобы узнать, когда поток завершен. Вот супер простой механизм обратного вызова. Сначала расширьте Runnableинтерфейс, чтобы иметь обратный вызов:

public interface CallbackRunnable extends Runnable {
    public void callback();
}

Затем создайте Executor, который выполнит ваш runnable и перезвонит вам, когда это будет сделано.

public class CallbackExecutor implements Executor {

    @Override
    public void execute(final Runnable r) {
        final Thread runner = new Thread(r);
        runner.start();
        if ( r instanceof CallbackRunnable ) {
            // create a thread to perform the callback
            Thread callerbacker = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // block until the running thread is done
                        runner.join();
                        ((CallbackRunnable)r).callback();
                    }
                    catch ( InterruptedException e ) {
                        // someone doesn't want us running. ok, maybe we give up.
                    }
                }
            });
            callerbacker.start();
        }
    }

}

Другая очевидная вещь, которую нужно добавить в ваш CallbackRunnableинтерфейс, - это средство для обработки любых исключений, поэтому, возможно, поместите public void uncaughtException(Throwable e);строку туда и в свой исполнитель, установите Thread.UncaughtExceptionHandler, чтобы отправить вас к этому методу интерфейса.

Но от этого действительно начинает пахнуть java.util.concurrent.Callable. Вам действительно стоит взглянуть на использование, java.util.concurrentесли ваш проект это позволяет.

broc.seib
источник
Я немного не понимаю, что вы получаете от этого механизма обратного вызова по сравнению с простым вызовом, runner.join()а затем с любым кодом, который вы хотите после этого, поскольку вы знаете, что поток завершился. Просто вы можете определить этот код как свойство runnable, чтобы у вас были разные вещи для разных runnables?
Стивен
2
Да, runner.join()это самый простой способ подождать. Я предполагал, что OP не хочет блокировать свой основной вызывающий поток, так как они просят «уведомлять» о каждой загрузке, которая может завершаться в любом порядке. Это предлагало один способ получать уведомления асинхронно.
broc.seib
4

Вы хотите дождаться их завершения? Если да, используйте метод Join.

Также существует свойство isAlive, если вы просто хотите его проверить.

Джонатан Аллен
источник
3
Обратите внимание, что isAlive возвращает false, если поток еще не начал выполняться (даже если ваш собственный поток уже вызвал для него start).
Том Хотин - tackline
@ TomHawtin-tackline, ты в этом уверен? Это противоречило бы документации Java (« Поток жив, если он был запущен и еще не умер» - docs.oracle.com/javase/6/docs/api/java/lang/… ). Это также противоречило бы ответам здесь ( stackoverflow.com/questions/17293304/… )
Стивен
@Stephen Я давно это не писал, но, похоже, это правда. Я полагаю, это вызвало у других людей проблемы, которые были свежи в моей памяти девять лет назад. То, что именно можно наблюдать, будет зависеть от реализации. Вы рассказать , Threadчтобы start, что нить делает, но вызов возвращается немедленно. isAliveдолжен быть простой тест флага, но когда я погуглил, метод был native.
Том Хотин - tackline
4

Вы можете опросить экземпляр потока с помощью getState (), который возвращает экземпляр перечисления Thread.State с одним из следующих значений:

*  NEW
  A thread that has not yet started is in this state.
* RUNNABLE
  A thread executing in the Java virtual machine is in this state.
* BLOCKED
  A thread that is blocked waiting for a monitor lock is in this state.
* WAITING
  A thread that is waiting indefinitely for another thread to perform a particular action is in this state.
* TIMED_WAITING
  A thread that is waiting for another thread to perform an action for up to a specified waiting time is in this state.
* TERMINATED
  A thread that has exited is in this state.

Однако я думаю, что было бы лучше иметь главный поток, который ожидает завершения трех дочерних элементов, а затем мастер продолжит выполнение, когда остальные 3 закончатся.

Микель
источник
Ожидание выхода трех детей может не соответствовать парадигме использования. Если это менеджер загрузок, они могут захотеть начать 15 загрузок и просто удалить статус из строки состояния или предупредить пользователя о завершении загрузки, и в этом случае обратный вызов будет работать лучше.
digitaljoel
3

Вы также можете использовать Executorsобъект для создания пула потоков ExecutorService . Затем используйте этот invokeAllметод для запуска каждого из ваших потоков и получения Futures. Это будет заблокировано, пока все не закончат выполнение. Другой вариант - выполнить каждый из них, используя пул, а затем вызвать awaitTerminationблокировку, пока пул не завершит выполнение. Просто не забудьте вызвать shutdown(), когда закончите добавлять задачи.

Дж. Гиттер
источник
2

Многое изменилось за последние 6 лет в области многопоточности.

Вместо использования join()и блокировки API вы можете использовать

1. ExecutorService invokeAll() API

Выполняет заданные задачи, возвращая список Futures с их статусом и результатами, когда все выполнено.

2. CountDownLatch

Средство синхронизации, позволяющее одному или нескольким потокам ожидать завершения набора операций, выполняемых в других потоках.

A CountDownLatchинициализируется заданным счетчиком. Методы await блокируются до тех пор, пока текущий счетчик не достигнет нуля из-за вызовов countDown()метода, после чего все ожидающие потоки освобождаются, и любые последующие вызовы await немедленно возвращаются. Это однократное явление - счетчик нельзя сбросить. Если вам нужна версия, которая сбрасывает счетчик, рассмотрите возможность использования CyclicBarrier.

3. ForkJoinPool или newWorkStealingPool()в Executors - другой способ

4.Iterate через все Futureзадачи из Подайте на ExecutorServiceи проверить состояние с блокировкой вызова get()на Futureобъект

Взгляните на связанные вопросы SE:

Как дождаться потока, который порождает свой собственный поток?

Исполнители: Как синхронно дождаться завершения всех задач, если задачи создаются рекурсивно?

Равиндра бабу
источник
2

Я бы посоветовал взглянуть на javadoc для класса Thread .

У вас есть несколько механизмов для управления потоками.

  • Ваш основной поток может join()последовательно выполнять три потока и не будет продолжать работу, пока не будут выполнены все три.

  • Периодически опрашивайте состояние порожденных потоков.

  • Поместите все порождены потоков в отдельный ThreadGroupи опрашивать activeCount()на ThreadGroupи ждать его , чтобы добраться до 0.

  • Настройте пользовательский интерфейс обратного вызова или прослушивателя для межпотокового взаимодействия.

Я уверен, что я все еще скучаю по множеству других способов.

digitaljoel
источник
1

Вот решение, которое простое, короткое, легкое для понимания и идеально подходит для меня. Мне нужно было рисовать на экране, когда заканчивается другой поток; но не мог, потому что главный поток контролирует экран. Так:

(1) Я создал глобальную переменную: boolean end1 = false;поток устанавливает для нее значение true при завершении. Это подхватывается в основном потоке циклом postDelayed, где на него и реагируют.

(2) Моя ветка содержит:

void myThread() {
    end1 = false;
    new CountDownTimer(((60000, 1000) { // milliseconds for onFinish, onTick
        public void onFinish()
        {
            // do stuff here once at end of time.
            end1 = true; // signal that the thread has ended.
        }
        public void onTick(long millisUntilFinished)
        {
          // do stuff here repeatedly.
        }
    }.start();

}

(3) К счастью, postDelayed выполняется в основном потоке, так что именно здесь проверяется другой поток раз в секунду. Когда другой поток заканчивается, он может начать все, что мы захотим делать дальше.

Handler h1 = new Handler();

private void checkThread() {
   h1.postDelayed(new Runnable() {
      public void run() {
         if (end1)
            // resond to the second thread ending here.
         else
            h1.postDelayed(this, 1000);
      }
   }, 1000);
}

(4) Наконец, запустите все это где-нибудь в вашем коде, вызвав:

void startThread()
{
   myThread();
   checkThread();
}
DreamMaster Pro
источник
1

Думаю, самый простой способ - использовать ThreadPoolExecutorкласс.

  1. У него есть очередь, и вы можете установить, сколько потоков должно работать параллельно.
  2. У него есть хорошие методы обратного вызова:

Крюковые методы

Этот класс предоставляет защищенные переопределяемые методы beforeExecute(java.lang.Thread, java.lang.Runnable)и afterExecute(java.lang.Runnable, java.lang.Throwable)методы, которые вызываются до и после выполнения каждой задачи. Их можно использовать для управления средой выполнения; например, повторная инициализация ThreadLocals, сбор статистики или добавление записей журнала. Кроме того, метод terminated()может быть переопределен для выполнения любой специальной обработки, которая должна выполняться после полного завершения Executor.

что именно то, что нам нужно. Мы переопределим afterExecute()получение обратных вызовов после завершения каждого потока и переопределим, terminated()чтобы знать, когда все потоки завершены.

Итак, вот что вам следует делать

  1. Создайте исполнителя:

    private ThreadPoolExecutor executor;
    private int NUMBER_OF_CORES = Runtime.getRuntime().availableProcessors();    
    
    
    
    private void initExecutor() {
    
    executor = new ThreadPoolExecutor(
            NUMBER_OF_CORES * 2,  //core pool size
            NUMBER_OF_CORES * 2, //max pool size
            60L, //keep aive time
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>()
    ) {
    
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
                //Yet another thread is finished:
                informUiAboutProgress(executor.getCompletedTaskCount(), listOfUrisToProcess.size());
            }
        }
    
    };
    
        @Override
        protected void terminated() {
            super.terminated();
            informUiThatWeAreDone();
        }
    
    }
  2. И начните свои темы:

    private void startTheWork(){
        for (Uri uri : listOfUrisToProcess) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    doSomeHeavyWork(uri);
                }
            });
        }
        executor.shutdown(); //call it when you won't add jobs anymore 
    }

Внутренний метод informUiThatWeAreDone();делает все, что вам нужно, когда все потоки завершены, например, обновляет пользовательский интерфейс.

ПРИМЕЧАНИЕ. Не забывайте об использовании synchronizedметодов, поскольку вы выполняете свою работу параллельно, и БУДЬТЕ ОСТОРОЖНЫ, если решите вызвать synchronizedметод из другого synchronizedметода! Это часто приводит к тупикам

Надеюсь это поможет!

Кирилл Кармазин
источник
0

Вы также можете использовать SwingWorker, который имеет встроенную поддержку изменения свойств. См. Метод addPropertyChangeListener () или get () для примера прослушивателя изменения состояния.

akarnokd
источник
0

Посмотрите документацию Java для класса Thread. Вы можете проверить состояние потока. Если вы поместите три потока в переменные-члены, тогда все три потока смогут читать состояния друг друга.

Однако вы должны быть немного осторожны, потому что вы можете вызвать состояние гонки между потоками. Просто постарайтесь избежать сложной логики, основанной на состоянии других потоков. Определенно избегайте записи нескольких потоков в одни и те же переменные.

Дон Киркби
источник