Способ есть, но он тебе не понравится. Следующий метод преобразует a Future<T>
в a CompletableFuture<T>
:
public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
if (future.isDone())
return transformDoneFuture(future);
return CompletableFuture.supplyAsync(() -> {
try {
if (!future.isDone())
awaitFutureIsDoneInForkJoinPool(future);
return future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
}
private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
CompletableFuture<T> cf = new CompletableFuture<>();
T result;
try {
result = future.get();
} catch (Throwable ex) {
cf.completeExceptionally(ex);
return cf;
}
cf.complete(result);
return cf;
}
private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
throws InterruptedException {
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
@Override public boolean block() throws InterruptedException {
try {
future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
return true;
}
@Override public boolean isReleasable() {
return future.isDone();
}
});
}
Очевидно, проблема с этим подходом состоит в том, что для каждого Future поток будет заблокирован, чтобы ждать результата Future, что противоречит идее Futures. В некоторых случаях можно было бы добиться большего. Однако в целом без активного ожидания результата в Будущем нет решения .
CompletableFuture.supplyAsync(supplier, new SinglethreadExecutor())
по крайней мере не блокирует потоки общего пула.Если библиотека, которую вы хотите использовать, также предлагает метод стиля обратного вызова в дополнение к стилю Future, вы можете предоставить ей обработчик, который завершает CompletableFuture без какой-либо дополнительной блокировки потоков. Вот так:
AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file")); // ... CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>(); open.read(buffer, position, null, new CompletionHandler<Integer, Void>() { @Override public void completed(Integer result, Void attachment) { completableFuture.complete(buffer); } @Override public void failed(Throwable exc, Void attachment) { completableFuture.completeExceptionally(exc); } }); completableFuture.thenApply(...)
Я вижу, что без обратного вызова единственный другой способ решить эту проблему - использовать цикл опроса, который помещает все ваши
Future.isDone()
проверки в один поток, а затем вызывает завершение всякий раз, когда можно получить Future.источник
Если ваш
Future
- результат вызоваExecutorService
метода (напримерsubmit()
), проще всего использовать этотCompletableFuture.runAsync(Runnable, Executor)
метод.Из
к
CompletableFuture
Затем создается «родной».РЕДАКТИРОВАТЬ: следуя комментариям @SamMefford, исправленным @MartinAndersson, если вы хотите передать a
Callable
, вам нужно позвонитьsupplyAsync()
, преобразовавCallable<T>
в aSupplier<T>
, например, с помощью:CompletableFuture.supplyAsync(() -> { try { return myCallable.call(); } catch (Exception ex) { throw new RuntimeException(ex); } // Or return default value }, myExecutor);
Поскольку
T Callable.call() throws Exception;
выдает исключение иT Supplier.get();
не делает этого, вы должны перехватить исключение, чтобы прототипы были совместимы.источник
CompletableFuture<T> future = CompletableFuture.supplyAsync(myCallable, myExecutor);
supplyAsync
получаетSupplier
. Код не будет компилироваться, если вы попытаетесь передать файлCallable
.Callable<T>
вSupplier<T>
.Я опубликовал небольшой проект будущего, который пытается сделать лучше, чем простой способ ответа.
Основная идея состоит в том, чтобы использовать только один поток (и, конечно же, не только с циклом вращения) для проверки всех состояний Futures внутри, что помогает избежать блокировки потока из пула для каждого преобразования Future -> CompletableFuture.
Пример использования:
источник
Предложение:
http://www.thedevpiece.com/converting-old-java-future-to-completablefuture/
Но, в основном:
public class CompletablePromiseContext { private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor(); public static void schedule(Runnable r) { SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS); } }
И CompletablePromise:
public class CompletablePromise<V> extends CompletableFuture<V> { private Future<V> future; public CompletablePromise(Future<V> future) { this.future = future; CompletablePromiseContext.schedule(this::tryToComplete); } private void tryToComplete() { if (future.isDone()) { try { complete(future.get()); } catch (InterruptedException e) { completeExceptionally(e); } catch (ExecutionException e) { completeExceptionally(e.getCause()); } return; } if (future.isCancelled()) { cancel(true); return; } CompletablePromiseContext.schedule(this::tryToComplete); } }
Пример:
public class Main { public static void main(String[] args) { final ExecutorService service = Executors.newSingleThreadExecutor(); final Future<String> stringFuture = service.submit(() -> "success"); final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture); completableFuture.whenComplete((result, failure) -> { System.out.println(result); }); } }
источник
CompletablePromiseContext
нестатический и взял параметр для интервала проверки (который здесь установлен на 1 мс), а затем перегрузилCompletablePromise<V>
конструктор, чтобы иметь возможность предоставить свой собственный,CompletablePromiseContext
возможно, с другим (более длинным) интервалом проверки для длительного времени,Future<V>
когда вы не 'не обязательно иметь возможность запускать обратный вызов (или создавать) сразу после завершения, и вы также можете иметь экземплярCompletablePromiseContext
для просмотра набораFuture
(если у вас их много)Позвольте мне предложить другой (надеюсь, лучший) вариант: https://github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata / одновременный
Вкратце идея такая:
CompletableTask<V>
интерфейс - объединениеCompletionStage<V>
+RunnableFuture<V>
ExecutorService
для возвратаCompletableTask
изsubmit(...)
методов (вместоFuture<V>
)Реализация использует альтернативную реализацию CompletionStage (обратите внимание, CompletionStage, а не CompletableFuture):
Применение:
J8ExecutorService exec = J8Executors.newCachedThreadPool(); CompletionStage<String> = exec .submit( someCallableA ) .thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b) .thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c);
источник