Как CountDownLatch используется в многопоточности Java?

184

Может ли кто-нибудь помочь мне понять, что такое Java CountDownLatchи когда его использовать?

У меня нет четкого представления о том, как работает эта программа. Как я понимаю, все три потока запускаются одновременно, и каждый поток вызовет CountDownLatch через 3000 мс. Так что обратный отсчет будет уменьшаться один за другим. После того, как защелка станет нулевой, программа напечатает «Завершено». Может быть, то, как я понял, неверно.

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class Processor implements Runnable {
    private CountDownLatch latch;

    public Processor(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        System.out.println("Started.");

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        latch.countDown();
    }
}

// ------------------------------------------------ -----

public class App {

    public static void main(String[] args) {

        CountDownLatch latch = new CountDownLatch(3); // coundown from 3 to 0

        ExecutorService executor = Executors.newFixedThreadPool(3); // 3 Threads in pool

        for(int i=0; i < 3; i++) {
            executor.submit(new Processor(latch)); // ref to latch. each time call new Processes latch will count down by 1
        }

        try {
            latch.await();  // wait until latch counted down to 0
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Completed.");
    }

}
амал
источник
9
Я просто использовал ваш пример кода вопроса для пакета параллельной службы Android, и он работал как шарм. Спасибо вам большое!
Roisgoen
Вот из этого видео 2012 года, которое демонстрирует удивительное сходство с примером, показанным здесь. Для всех, кто интересуется, это часть учебного пособия по многопоточности Java от парня по имени Джон. Мне нравится Джон. Настоятельно рекомендуется.
Элия ​​Грейди

Ответы:

194

Да, вы правильно поняли. CountDownLatchработает по принципу защелки, основной поток будет ждать, пока ворота не откроются. Один поток ожидает n потоков, указанных при создании CountDownLatch.

Любой поток, обычно основной поток приложения, вызов которого CountDownLatch.await()будет ожидать, пока счетчик не достигнет нуля или не прервется другим потоком. Все остальные потоки обязаны вести обратный отсчет, вызывая их, CountDownLatch.countDown()когда они завершены или готовы.

Как только счет достигает нуля, ожидающий поток продолжается. Одним из недостатков / преимуществ CountDownLatchявляется то, что его нельзя использовать повторно: как только счет достигнет нуля, вы больше не сможете его использовать CountDownLatch.

Редактировать:

Используется, CountDownLatchкогда одному потоку (например, основному потоку) требуется дождаться завершения одного или нескольких потоков, прежде чем он сможет продолжить обработку.

Классическим примером использования CountDownLatchв Java является базовое Java-приложение на стороне сервера, которое использует архитектуру служб, где несколько служб предоставляются несколькими потоками, и приложение не может начать обработку, пока все службы не будут успешно запущены.

У вопроса PS OP довольно простой пример, поэтому я его не включил.

NikolaB
источник
1
Спасибо за ответ. Не могли бы вы привести пример применения защелки CountDown?
Амаль
11
руководство по использованию CountDownLatch находится здесь howtodoinjava.com/2013/07/18/…
thiagoh
1
@NikolaB Но в данном примере мы можем достичь того же результата, используя метод соединения, не так ли?
Викас Верма
3
Я бы посчитал одноразовое использование преимуществом: вы уверены, что никто не сможет сбросить его или увеличить количество.
ataulm
3
Хорошее объяснение. Но я бы немного не согласился по этому вопросу One thread waits for n number of threads specified while creating CountDownLatch in Java. Если вам нужен такой механизм, то его целесообразно использовать CyclicBarrier. Фундаментальное концептуальное различие между этими двумя, как указано в Java concurrency in Practiceэто: Latches are for waiting for events; barriers are for waiting for other threads. cyclicBarrier.await()переходит в состояние блокировки
Рахул Дев Мишра
43

CountDownLatchв Java это тип синхронизатора, который позволяет Thread подождать один или несколько Threadсекунд, прежде чем он начнет обработку.

CountDownLatchработает по принципу защелки, поток будет ждать, пока ворота не откроются. Один поток ожидает nколичество потоков, указанное при создании CountDownLatch.

например final CountDownLatch latch = new CountDownLatch(3);

Здесь мы устанавливаем счетчик на 3.

Любой поток, обычно основной поток приложения, вызов которого CountDownLatch.await()будет ожидать, пока счетчик не достигнет нуля или не прервется другим Thread. Все остальные потоки обязаны выполнять обратный отсчет путем вызова CountDownLatch.countDown()после завершения или готовности к работе. как только счет достигает нуля, Threadзапускается ожидание.

Здесь счетчик уменьшается по CountDownLatch.countDown()методу.

Метод, Threadкоторый вызывает await()метод, будет ждать, пока начальное число не достигнет нуля.

Чтобы счетчик равнялся нулю, другие потоки должны вызывать countDown()метод. Как только счет станет равным нулю, поток, вызвавший await()метод, возобновит работу (начнет его выполнение).

Недостатком CountDownLatchявляется то, что он не может быть использован повторно: как только счет станет равным нулю, он больше не будет использоваться.

Вишал Аккалкоте
источник
мы используем new CountDownLatch(3)как у нас есть 3 потока из newFixedThreadPool определенного?
Chaklader Асфак Арефе
не должно "до того, как он начнет обрабатывать" быть "до того, как он продолжит обработку"?
Мария Инес Парнисари
@Arefe Да, это количество потоков, проходящих через ваш блок кода
Vishal Akkalkote
23

NikolaB объяснил это очень хорошо, однако пример будет полезен для понимания, так что вот один простой пример ...

 import java.util.concurrent.*;


  public class CountDownLatchExample {

  public static class ProcessThread implements Runnable {

    CountDownLatch latch;
    long workDuration;
    String name;

    public ProcessThread(String name, CountDownLatch latch, long duration){
        this.name= name;
        this.latch = latch;
        this.workDuration = duration;
    }


    public void run() {
        try {
            System.out.println(name +" Processing Something for "+ workDuration/1000 + " Seconds");
            Thread.sleep(workDuration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(name+ "completed its works");
        //when task finished.. count down the latch count...

        // basically this is same as calling lock object notify(), and object here is latch
        latch.countDown();
    }
}


public static void main(String[] args) {
    // Parent thread creating a latch object
    CountDownLatch latch = new CountDownLatch(3);

    new Thread(new ProcessThread("Worker1",latch, 2000)).start(); // time in millis.. 2 secs
    new Thread(new ProcessThread("Worker2",latch, 6000)).start();//6 secs
    new Thread(new ProcessThread("Worker3",latch, 4000)).start();//4 secs


    System.out.println("waiting for Children processes to complete....");
    try {
        //current thread will get notified if all chidren's are done 
        // and thread will resume from wait() mode.
        latch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("All Process Completed....");

    System.out.println("Parent Thread Resuming work....");



     }
  }
vikashait
источник
22

Он используется, когда мы хотим подождать, пока несколько потоков выполнят свою задачу. Это похоже на присоединение к темам.

Где мы можем использовать CountDownLatch

Рассмотрим сценарий, где у нас есть требование, когда у нас есть три потока «A», «B» и «C», и мы хотим запустить поток «C» только тогда, когда потоки «A» и «B» завершают или частично завершают свою задачу.

Это может быть применено к реальному сценарию IT

Рассмотрим сценарий, в котором менеджер разделяет модули между группами разработчиков (A и B) и хочет назначить их команде QA для тестирования только тогда, когда обе команды выполнят свою задачу.

public class Manager {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        MyDevTeam teamDevA = new MyDevTeam(countDownLatch, "devA");
        MyDevTeam teamDevB = new MyDevTeam(countDownLatch, "devB");
        teamDevA.start();
        teamDevB.start();
        countDownLatch.await();
        MyQATeam qa = new MyQATeam();
        qa.start();
    }   
}

class MyDevTeam extends Thread {   
    CountDownLatch countDownLatch;
    public MyDevTeam (CountDownLatch countDownLatch, String name) {
        super(name);
        this.countDownLatch = countDownLatch;       
    }   
    @Override
    public void run() {
        System.out.println("Task assigned to development team " + Thread.currentThread().getName());
        try {
                Thread.sleep(2000);
        } catch (InterruptedException ex) {
                ex.printStackTrace();
        }
    System.out.println("Task finished by development team Thread.currentThread().getName());
            this.countDownLatch.countDown();
    }
}

class MyQATeam extends Thread {   
    @Override
    public void run() {
        System.out.println("Task assigned to QA team");
        try {
                Thread.sleep(2000);
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
        System.out.println("Task finished by QA team");
    }
}

Вывод вышеуказанного кода будет:

Задача, поставленная перед командой разработчиков devB

Задача, поставленная перед командой разработчиков devA

Задача выполнена командой разработчиков devB

Задача выполнена командой разработчиков devA

Задача, назначенная команде QA

Задача выполнена командой QA

Здесь метод await () ожидает, когда флаг countdownlatch станет равным 0, а метод countDown () уменьшает флаг countdownlatch на 1.

Ограничение JOIN: Приведенный выше пример также может быть достигнут с помощью JOIN, но JOIN нельзя использовать в двух сценариях:

  1. Когда мы используем ExecutorService вместо класса Thread для создания потоков.
  2. Измените приведенный выше пример, когда Manager хочет передать код команде QA, как только Development выполнит свою задачу на 80%. Это означает, что CountDownLatch позволяет нам изменять реализацию, которую можно использовать для ожидания другого потока для их частичного выполнения.
V Джо
источник
3

CoundDownLatch позволяет вам заставить поток ждать, пока все остальные потоки не завершат свое выполнение.

Псевдокод может быть:

// Main thread starts
// Create CountDownLatch for N threads
// Create and start N threads
// Main thread waits on latch
// N threads completes there tasks are returns
// Main thread resume execution
Кристоф Русси
источник
Возможно, вы захотите удалить все свое описание из блока кода
Пол Ло
Лучший комментарий, хотя. Мне нравятся эти "точечные" комментарии вместо теоретических объяснений.
Renatoaraujoc
2

Один хороший пример того, когда использовать что-то подобное, - это Java Simple Serial Connector для доступа к последовательным портам. Обычно вы что-то записываете в порт, и асинхронно в другом потоке устройство отвечает SerialPortEventListener. Как правило, вы хотите сделать паузу после записи в порт, чтобы дождаться ответа. Ручная обработка потоков для этого сценария чрезвычайно сложна, но использовать Countdownlatch легко. Прежде чем думать, что вы можете сделать это по-другому, будьте осторожны с условиями гонки, о которых вы никогда не думали !!

псевдокод:

CountDownLatch latch;
void writeData() { 
   latch = new CountDownLatch(1);
   serialPort.writeBytes(sb.toString().getBytes())
   try {
      latch.await(4, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
   }
}
class SerialPortReader implements SerialPortEventListener {
    public void serialEvent(SerialPortEvent event) {
        if(event.isRXCHAR()){//If data is available
            byte buffer[] = serialPort.readBytes(event.getEventValue());
            latch.countDown();
         }
     }
}

xpusostomos
источник
2

Если вы добавите отладку после вызова latch.countDown (), это может помочь вам лучше понять ее поведение.

latch.countDown();
System.out.println("DONE "+this.latch); // Add this debug

Выходные данные покажут, что число уменьшается. Это 'count' - это фактически число запущенных задач (объектов Processor), для которых вы начали, countDown () не был вызван и, следовательно, блокируется основным потоком при вызове latch.await ().

DONE java.util.concurrent.CountDownLatch@70e69696[Count = 2]
DONE java.util.concurrent.CountDownLatch@70e69696[Count = 1]
DONE java.util.concurrent.CountDownLatch@70e69696[Count = 0]
natmat
источник
2

Из документации оракула о CountDownLatch :

Средство синхронизации, которое позволяет одному или нескольким потокам ожидать завершения набора операций, выполняемых в других потоках.

А CountDownLatchинициализируется с заданным количеством. Эти awaitметоды блокируют до тех пор , текущее значение счетчика достигает нуля не связано с вызовами countDown()метода, после чего все потоки , ожидающие будут освобождены и любых последующих вызовах ждать возвращения немедленно. Это одноразовое явление - счет не может быть сброшен.

CountDownLatch - это универсальный инструмент синхронизации, который можно использовать для различных целей.

CountDownLatchИнициализируется со счетчиком одного служит простой вкл / выкл, защелка или ворот: для всех потоков , ссылающимся ждут ждать в воротах , пока не будет открыт с помощью резьбы вызова CountDown ().

CountDownLatchИнициализируется N может быть использован , чтобы сделать одну нить ждать , пока N потоков не выполнили какое - либо действие, или какое - то действие было завершено N раз.

public void await()
           throws InterruptedException

Заставляет текущий поток ждать, пока защелка не обратится к нулю, если поток не прерывается.

Если текущий счетчик равен нулю, этот метод возвращается немедленно.

public void countDown()

Уменьшает счетчик защелки, освобождая все ожидающие потоки, если счет достигает нуля.

Если текущий счетчик больше нуля, то он уменьшается. Если новый счетчик равен нулю, то все ожидающие потоки повторно включаются для целей планирования потоков.

Объяснение вашего примера.

  1. Вы установили счетчик как 3 для latchпеременной

    CountDownLatch latch = new CountDownLatch(3);
  2. Вы передали этот общий ресурс latchв рабочий поток:Processor

  3. Три Runnableэкземпляра Processorбыли представленыExecutorService executor
  4. Основной поток ( App) ожидает, что число станет нулевым с оператором ниже

     latch.await();  
  5. Processor поток спит в течение 3 секунд, а затем уменьшает значение счетчика с latch.countDown()
  6. Первый Processэкземпляр изменит количество защелок на 2 после его завершения из-заlatch.countDown() .

  7. Второй Processэкземпляр изменит количество защелок на 1 после его завершения из-за latch.countDown().

  8. Третий Processэкземпляр изменит количество защелок на 0 после его завершения из-за latch.countDown().

  9. Нулевой отсчет на защелке приводит к тому, что основной поток Appвыходит изawait

  10. Приложение App теперь печатает этот вывод: Completed

Равиндра Бабу
источник
2

Этот пример из Java Doc помог мне понять концепции:

class Driver { // ...
  void main() throws InterruptedException {
    CountDownLatch startSignal = new CountDownLatch(1);
    CountDownLatch doneSignal = new CountDownLatch(N);

    for (int i = 0; i < N; ++i) // create and start threads
      new Thread(new Worker(startSignal, doneSignal)).start();

    doSomethingElse();            // don't let run yet
    startSignal.countDown();      // let all threads proceed
    doSomethingElse();
    doneSignal.await();           // wait for all to finish
  }
}

class Worker implements Runnable {
  private final CountDownLatch startSignal;
  private final CountDownLatch doneSignal;
  Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
     this.startSignal = startSignal;
     this.doneSignal = doneSignal;
  }
  public void run() {
     try {
       startSignal.await();
       doWork();
       doneSignal.countDown();
     } catch (InterruptedException ex) {} // return;
  }

  void doWork() { ... }
}

Визуальная интерпретация:

введите описание изображения здесь

Очевидно, CountDownLatchпозволяет одному потоку (здесь Driver) ждать, пока несколько запущенных потоков (здесь Worker) не завершат их выполнение.

Саурав Саху
источник
1

Как упоминалось в JavaDoc ( https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html ), CountDownLatch - это средство синхронизации, представленное в Java 5. Здесь синхронизация не означает ограничение доступа к критическому разделу. Но скорее последовательность действий разных потоков. Тип синхронизации, достигнутый с помощью CountDownLatch, аналогичен типу Join. Предположим, что существует поток «M», который должен ждать, пока другие рабочие потоки «T1», «T2», «T3» завершат свои задачи. До Java 1.5 это можно было сделать следующим образом: M выполняет следующий код

    T1.join();
    T2.join();
    T3.join();

Приведенный выше код гарантирует, что поток M возобновит свою работу после того, как T1, T2, T3 завершат свою работу. T1, T2, T3 могут завершить свою работу в любом порядке. То же самое может быть достигнуто с помощью CountDownLatch, где T1, T2, T3 и поток M совместно используют один и тот же объект CountDownLatch.
«M» запрашивает: countDownLatch.await();
где, как «T1», «T2», «T3» делает countDownLatch.countdown();

Недостатком метода соединения является то, что М должен знать о T1, T2, T3. Если позже добавлен новый рабочий поток T4, то M тоже должен об этом знать. Этого можно избежать с помощью CountDownLatch. После реализации последовательность действий будет [T1, T2, T3] (порядок T1, T2, T3 может быть в любом случае) -> [M]

SR Чайтанья
источник
0

Лучший пример реального времени для countDownLatch, объясненный в этой ссылке CountDownLatchExample

Эшвин Патил
источник
0
package practice;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch c= new CountDownLatch(3);  // need to decrements the count (3) to zero by calling countDown() method so that main thread will wake up after calling await() method 
        Task t = new Task(c);
        Task t1 = new Task(c);
        Task t2 = new Task(c);
        t.start();
        t1.start();
        t2.start();
        c.await(); // when count becomes zero main thread will wake up 
        System.out.println("This will print after count down latch count become zero");
    }
}

class Task extends Thread{
    CountDownLatch c;

    public Task(CountDownLatch c) {
        this.c = c;
    }

    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName());
            Thread.sleep(1000);
            c.countDown();   // each thread decrement the count by one 
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
Sumit
источник