У меня есть метод, который возвращает List
фьючерсы
List<Future<O>> futures = getFutures();
Теперь я хочу дождаться, пока либо все фьючерсы не будут успешно обработаны, либо какая-либо из задач, вывод которых возвращается будущим, выдает исключение. Даже если одна задача выдает исключение, нет смысла ждать другого будущего.
Простым подходом было бы
wait() {
For(Future f : futures) {
try {
f.get();
} catch(Exception e) {
//TODO catch specific exception
// this future threw exception , means somone could not do its task
return;
}
}
}
Но проблема здесь в том, что если, например, 4-е будущее выдает исключение, я буду без необходимости ждать, пока будут доступны первые 3 фьючерса.
Как это решить? Будет ли отсчитывать помощь защелки? Я не могу использовать Future, isDone
потому что в java-документе написано
boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
java
multithreading
future
user93796
источник
источник
ExecutionService
для каждой «партии» задач, передать их ему, а затем немедленно закрыть службу и использоватьawaitTermination()
ее, я полагаю.CountDownLatch
если вы обернули тело всех своих фьючерсов в a,try..finally
чтобы убедиться, что защелка также уменьшается.Ответы:
Вы можете использовать CompletionService для получения фьючерсов, как только они будут готовы, и если один из них выдает исключение, отмените обработку. Что-то вроде этого:
Executor executor = Executors.newFixedThreadPool(4); CompletionService<SomeResult> completionService = new ExecutorCompletionService<SomeResult>(executor); //4 tasks for(int i = 0; i < 4; i++) { completionService.submit(new Callable<SomeResult>() { public SomeResult call() { ... return result; } }); } int received = 0; boolean errors = false; while(received < 4 && !errors) { Future<SomeResult> resultFuture = completionService.take(); //blocks if none available try { SomeResult result = resultFuture.get(); received ++; ... // do something with the result } catch(Exception e) { //log errors = true; } }
Я думаю, вы можете улучшить, отменив все еще выполняющиеся задачи, если одна из них выдает ошибку.
источник
CompletionService
.Если вы используете Java 8, вы можете сделать это проще с помощью CompletableFuture и CompletableFuture.allOf , который применяет обратный вызов только после того, как будут выполнены все предоставленные CompletableFutures.
// Waits for *all* futures to complete and returns a list of results. // If *any* future completes exceptionally then the resulting future will also complete exceptionally. public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) { CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]); return CompletableFuture.allOf(cfs) .thenApply(ignored -> futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()) ); }
источник
Future
экземпляры, вы не можете применить этот метод. ПреобразоватьFuture
вCompletableFuture
.Использовать
CompletableFuture
в Java 8// Kick of multiple, asynchronous lookups CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1"); CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2"); CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3"); // Wait until they are all done CompletableFuture.allOf(page1,page2,page3).join(); logger.info("--> " + page1.get());
источник
Вы можете использовать ExecutorCompletionService . В документации даже есть пример для вашего точного варианта использования:
void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException { CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e); int n = solvers.size(); List<Future<Result>> futures = new ArrayList<Future<Result>>(n); Result result = null; try { for (Callable<Result> s : solvers) futures.add(ecs.submit(s)); for (int i = 0; i < n; ++i) { try { Result r = ecs.take().get(); if (r != null) { result = r; break; } } catch (ExecutionException ignore) { } } } finally { for (Future<Result> f : futures) f.cancel(true); } if (result != null) use(result); }
Здесь важно отметить, что ecs.take () получит первую выполненную задачу, а не только первую отправленную. Таким образом, вы должны получить их в порядке завершения выполнения (или выброса исключения).
источник
Если вы используете Java 8 и не хотите манипулировать
CompletableFuture
s, я написал инструмент для получения результатов приList<Future<T>>
использовании потоковой передачи. Ключ в том, что вам запрещеноmap(Future::get)
как его бросать.public final class Futures { private Futures() {} public static <E> Collector<Future<E>, Collection<E>, List<E>> present() { return new FutureCollector<>(); } private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>> { private final List<Throwable> exceptions = new LinkedList<>(); @Override public Supplier<Collection<T>> supplier() { return LinkedList::new; } @Override public BiConsumer<Collection<T>, Future<T>> accumulator() { return (r, f) -> { try { r.add(f.get()); } catch (InterruptedException e) {} catch (ExecutionException e) { exceptions.add(e.getCause()); } }; } @Override public BinaryOperator<Collection<T>> combiner() { return (l1, l2) -> { l1.addAll(l2); return l1; }; } @Override public Function<Collection<T>, List<T>> finisher() { return l -> { List<T> ret = new ArrayList<>(l); if (!exceptions.isEmpty()) throw new AggregateException(exceptions, ret); return ret; }; } @Override public Set<java.util.stream.Collector.Characteristics> characteristics() { return java.util.Collections.emptySet(); } }
Для этого нужен инструмент,
AggregateException
который работает как C #public class AggregateException extends RuntimeException { /** * */ private static final long serialVersionUID = -4477649337710077094L; private final List<Throwable> causes; private List<?> successfulElements; public AggregateException(List<Throwable> causes, List<?> l) { this.causes = causes; successfulElements = l; } public AggregateException(List<Throwable> causes) { this.causes = causes; } @Override public synchronized Throwable getCause() { return this; } public List<Throwable> getCauses() { return causes; } public List<?> getSuccessfulElements() { return successfulElements; } public void setSuccessfulElements(List<?> successfulElements) { this.successfulElements = successfulElements; } }
Этот компонент действует точно так же, как Task.WaitAll в C # . Я работаю над вариантом, который делает то же самое
CompletableFuture.allOf
(эквивалентноTask.WhenAll
)Причина, по которой я это сделал, заключается в том, что я использую Spring
ListenableFuture
и не хочу переносить,CompletableFuture
несмотря на то, что это более стандартный способисточник
Если вы хотите объединить Список CompletableFutures, вы можете сделать это:
List<CompletableFuture<Void>> futures = new ArrayList<>(); // ... Add futures to this ArrayList of CompletableFutures // CompletableFuture.allOf() method demand a variadic arguments // You can use this syntax to pass a List instead CompletableFuture<Void> allFutures = CompletableFuture.allOf( futures.toArray(new CompletableFuture[futures.size()])); // Wait for all individual CompletableFuture to complete // All individual CompletableFutures are executed in parallel allFutures.get();
Для получения дополнительной информации о Future и CompletableFuture, полезные ссылки:
1. Future: https://www.baeldung.com/java-future
2. CompletableFuture: https://www.baeldung.com/java-completablefuture
3. CompletableFuture: https : //www.callicoder.com/java-8-completablefuture-tutorial/
источник
возможно, это поможет (ничто не заменит необработанным потоком, да!). Я предлагаю запускать каждого
Future
парня с отдельным потоком (они идут параллельно), а затем, когда возникает одна из полученных ошибок, это просто сигнализирует менеджеру (Handler
классу).class Handler{ //... private Thread thisThread; private boolean failed=false; private Thread[] trds; public void waitFor(){ thisThread=Thread.currentThread(); List<Future<Object>> futures = getFutures(); trds=new Thread[futures.size()]; for (int i = 0; i < trds.length; i++) { RunTask rt=new RunTask(futures.get(i), this); trds[i]=new Thread(rt); } synchronized (this) { for(Thread tx:trds){ tx.start(); } } for(Thread tx:trds){ try {tx.join(); } catch (InterruptedException e) { System.out.println("Job failed!");break; } }if(!failed){System.out.println("Job Done");} } private List<Future<Object>> getFutures() { return null; } public synchronized void cancelOther(){if(failed){return;} failed=true; for(Thread tx:trds){ tx.stop();//Deprecated but works here like a boss }thisThread.interrupt(); } //... } class RunTask implements Runnable{ private Future f;private Handler h; public RunTask(Future f,Handler h){this.f=f;this.h=h;} public void run(){ try{ f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation) }catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();} catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");} } }
Я должен сказать, что приведенный выше код приведет к ошибке (не проверял), но я надеюсь, что смогу объяснить решение. пожалуйста, попробуйте.
источник
/** * execute suppliers as future tasks then wait / join for getting results * @param functors a supplier(s) to execute * @return a list of results */ private List getResultsInFuture(Supplier<?>... functors) { CompletableFuture[] futures = stream(functors) .map(CompletableFuture::supplyAsync) .collect(Collectors.toList()) .toArray(new CompletableFuture[functors.length]); CompletableFuture.allOf(futures).join(); return stream(futures).map(a-> { try { return a.get(); } catch (InterruptedException | ExecutionException e) { //logger.error("an error occurred during runtime execution a function",e); return null; } }).collect(Collectors.toList()); };
источник
CompletionService будет принимать ваши вызываемые объекты с помощью метода .submit (), а вы можете получить вычисленные фьючерсы с помощью метода .take ().
Одна вещь, которую вы не должны забыть, - это завершить ExecutorService, вызвав метод .shutdown (). Также вы можете вызывать этот метод, только если вы сохранили ссылку на службу исполнителя, поэтому обязательно сохраните ее.
Пример кода - для параллельной работы с фиксированным количеством рабочих элементов:
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); CompletionService<YourCallableImplementor> completionService = new ExecutorCompletionService<YourCallableImplementor>(service); ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>(); for (String computeMe : elementsToCompute) { futures.add(completionService.submit(new YourCallableImplementor(computeMe))); } //now retrieve the futures after computation (auto wait for it) int received = 0; while(received < elementsToCompute.size()) { Future<YourCallableImplementor> resultFuture = completionService.take(); YourCallableImplementor result = resultFuture.get(); received ++; } //important: shutdown your ExecutorService service.shutdown();
Пример кода - для параллельной работы над динамическим количеством рабочих элементов:
public void runIt(){ ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); CompletionService<CallableImplementor> completionService = new ExecutorCompletionService<CallableImplementor>(service); ArrayList<Future<CallableImplementor>> futures = new ArrayList<Future<CallableImplementor>>(); //Initial workload is 8 threads for (int i = 0; i < 9; i++) { futures.add(completionService.submit(write.new CallableImplementor())); } boolean finished = false; while (!finished) { try { Future<CallableImplementor> resultFuture; resultFuture = completionService.take(); CallableImplementor result = resultFuture.get(); finished = doSomethingWith(result.getResult()); result.setResult(null); result = null; resultFuture = null; //After work package has been finished create new work package and add it to futures futures.add(completionService.submit(write.new CallableImplementor())); } catch (InterruptedException | ExecutionException e) { //handle interrupted and assert correct thread / work packet count } } //important: shutdown your ExecutorService service.shutdown(); } public class CallableImplementor implements Callable{ boolean result; @Override public CallableImplementor call() throws Exception { //business logic goes here return this; } public boolean getResult() { return result; } public void setResult(boolean result) { this.result = result; } }
источник
У меня есть служебный класс, который содержит:
@FunctionalInterface public interface CheckedSupplier<X> { X get() throws Throwable; } public static <X> Supplier<X> uncheckedSupplier(final CheckedSupplier<X> supplier) { return () -> { try { return supplier.get(); } catch (final Throwable checkedException) { throw new IllegalStateException(checkedException); } }; }
Как только у вас есть это, используя статический импорт, вы можете просто дождаться всех фьючерсов следующим образом:
вы также можете собрать все их результаты следующим образом:
Просто перечитываю мой старый пост и замечаю, что у вас было еще одно горе:
В этом случае простое решение - сделать это параллельно:
Таким образом, первое исключение, хотя и не остановит будущее, нарушит оператор forEach, как в последовательном примере, но поскольку все ждут параллельно, вам не придется ждать завершения первых трех.
источник
import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.stream.Collectors; import java.util.stream.Stream; public class Stack2 { public static void waitFor(List<Future<?>> futures) { List<Future<?>> futureCopies = new ArrayList<Future<?>>(futures);//contains features for which status has not been completed while (!futureCopies.isEmpty()) {//worst case :all task worked without exception, then this method should wait for all tasks Iterator<Future<?>> futureCopiesIterator = futureCopies.iterator(); while (futureCopiesIterator.hasNext()) { Future<?> future = futureCopiesIterator.next(); if (future.isDone()) {//already done futureCopiesIterator.remove(); try { future.get();// no longer waiting } catch (InterruptedException e) { //ignore //only happen when current Thread interrupted } catch (ExecutionException e) { Throwable throwable = e.getCause();// real cause of exception futureCopies.forEach(f -> f.cancel(true));//cancel other tasks that not completed return; } } } } } public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(3); Runnable runnable1 = new Runnable (){ public void run(){ try { Thread.sleep(5000); } catch (InterruptedException e) { } } }; Runnable runnable2 = new Runnable (){ public void run(){ try { Thread.sleep(4000); } catch (InterruptedException e) { } } }; Runnable fail = new Runnable (){ public void run(){ try { Thread.sleep(1000); throw new RuntimeException("bla bla bla"); } catch (InterruptedException e) { } } }; List<Future<?>> futures = Stream.of(runnable1,fail,runnable2) .map(executorService::submit) .collect(Collectors.toList()); double start = System.nanoTime(); waitFor(futures); double end = (System.nanoTime()-start)/1e9; System.out.println(end +" seconds"); } }
источник