Разница между CompletableFuture, Future и RxJava, наблюдаемой

194

Я хотел бы знать разницу между CompletableFuture, Futureи Observable RxJava.

Я знаю, что все они асинхронные, но

Future.get() блокирует поток

CompletableFuture дает методы обратного вызова

RxJava Observable--- аналогично CompletableFutureс другими преимуществами (не уверен)

Например: если клиенту нужно сделать несколько вызовов службы и когда мы используем Futures(Java) Future.get()будет выполняться последовательно ... хотел бы знать, как лучше в RxJava ..

А в документации http://reactivex.io/intro.html сказано

Сложно использовать Futures для оптимального составления условных асинхронных потоков выполнения (или невозможно, поскольку задержки каждого запроса меняются во время выполнения). Конечно, это можно сделать, но это быстро усложняется (и поэтому подвержено ошибкам) ​​или преждевременно блокирует Future.get (), что исключает преимущество асинхронного выполнения.

Действительно интересно узнать, как RxJavaрешается эта проблема. Мне было трудно понять из документации.

shiv455
источник
Вы читали документацию для каждого? Я совершенно незнаком с RxJava, но документация выглядит очень тщательно с первого взгляда. Кажется, это не особенно сопоставимо с двумя фьючерсами.
FThompson
я прошел, но не смог понять, насколько он отличается от фьючерсов Java ... поправьте меня, если я не прав
shiv455
Чем наблюдаемые похожи на фьючерсы?
FThompson
2
хотел бы знать, где это отличается, как это отличается в управлении потоками ?? EX: Future.get () блокирует поток .... как это будет обрабатываться в Observable ???
shiv455
2
по крайней мере, это немного сбивает с толку для меня ... разница в высоком уровне будет очень полезно!
shiv455

Ответы:

281

фьючерсы

Фьючерсы были введены в Java 5 (2004). Они в основном заполнители для результата операции, которая еще не завершена. Как только операция Futureзавершится, результат будет содержать этот результат. Например, операция может быть экземпляром Runnable или Callable, который передается в ExecutorService . Отправитель операции может использовать Futureобъект, чтобы проверить, является ли операция isDone () , или дождаться ее завершения, используя блокирующий метод get () .

Пример:

/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 1;
    }

}

public static void main(String[] args) throws Exception{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<Integer> f = exec.submit(new MyCallable());

    System.out.println(f.isDone()); //False

    System.out.println(f.get()); //Waits until the task is done, then prints 1
}

CompletableFutures

CompletableFutures были введены в Java 8 (2014). На самом деле они представляют собой эволюцию обычных Futures, вдохновленных Google Listenable Futures , частью библиотеки Guava . Это фьючерсы, которые также позволяют вам связывать задачи в цепочку. Вы можете использовать их, чтобы сказать некоторому рабочему потоку «иди и выполни какое-нибудь задание X, а когда закончишь, иди делай другое, используя результат X». Используя CompletableFutures, вы можете что-то сделать с результатом операции, фактически не блокируя поток для ожидания результата. Вот простой пример:

/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {

    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //Do nothing
        }
        return 1;
    }
}

/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {

    @Override
    public Integer apply(Integer x) {
        return x + 1;
    }
}

public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
    System.out.println(f.isDone()); // False
    CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
    System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

RxJava

RxJava - это целая библиотека для реактивного программирования, созданная в Netflix. На первый взгляд, это будет похоже на потоки Java 8 . Это, за исключением того, что это намного мощнее.

Как и в случае с Futures, RxJava можно использовать для объединения нескольких синхронных или асинхронных действий для создания конвейера обработки. В отличие от Futures, которые являются одноразовыми, RxJava работает с потоками от нуля или более элементов. Включая нескончаемые потоки с бесконечным количеством элементов. Это также намного гибче и мощнее благодаря невероятно богатому набору операторов .

В отличие от потоков Java 8, RxJava также имеет механизм противодавления , который позволяет ему обрабатывать случаи, когда разные части вашего конвейера обработки работают в разных потоках с разной скоростью .

Недостатком RxJava является то, что, несмотря на солидную документацию, это сложная библиотека для изучения из-за изменения парадигмы. Код Rx также может быть кошмаром для отладки, особенно если задействованы несколько потоков, и еще хуже - если требуется обратное давление.

Если вы хотите в нее войти, на официальном сайте есть целая страница с различными учебными пособиями, а также официальная документация и Javadoc . Вы также можете взглянуть на некоторые видео, такие как это, которое дает краткое введение в Rx, а также рассказывает о различиях между Rx и Futures.

Бонус: Java 9 Reactive Streams

Реактивные потоки Java 9, также известные как Flow API, представляют собой набор интерфейсов, реализованных различными библиотеками реактивных потоков, такими как RxJava 2 , Akka Streams и Vertx . Они позволяют этим реактивным библиотекам соединяться, сохраняя при этом все важные противодавления.

солод
источник
Было бы неплохо привести пример кода того, как Rx делает это
Zinan Xing
Итак, используя Reactive Streams, мы можем смешивать RxJava, Akka и Vertx в одном приложении?
Игорь Ганапольский
1
@IgorGanapolsky Да.
солод
В CompletableFutures мы используем методы обратного вызова, эти методы обратного вызова также блокируются, если вывод одного метода является вводом другого обратного вызова. Как блок будущего с вызовом Future.get (). Почему сказано, что Future.get () блокирует вызов, а CompletableFutures не блокирует. Пожалуйста, объясните
Дипак
1
@ Федерико Конечно. Каждый Futureявляется заполнителем для одного результата, который может быть, а может и не завершен. Если вы снова выполните ту же операцию, вы получите новый Futureэкземпляр. RxJava имеет дело с потоками результатов, которые могут прийти в любое время. Таким образом, ряд операций может возвращать одну наблюдаемую RxJava, которая выкачает кучу результатов. Это немного похоже на разницу между одним почтовым конвертом и пневматической трубкой, которая продолжает откачивать почту.
солод
21

Я работаю с Rx Java с 0.9, сейчас на 1.3.2 и вскоре перехожу на 2.x Я использую это в частном проекте, над которым я работаю уже 8 лет.

Я бы больше не программировал без этой библиотеки. Вначале я был скептиком, но это совершенно другое состояние ума, которое вам нужно создать. Тихо трудно в начале. Я иногда часами смотрел на мрамор .. лол

Это просто вопрос практики и реального знакомства с потоком (он же контракт наблюдаемых и наблюдателя), как только вы попадете туда, вам не понравится делать это иначе.

Для меня нет ничего плохого в этой библиотеке.

Вариант использования: у меня есть вид монитора, который содержит 9 датчиков (процессор, память, сеть и т. Д.). При запуске представления оно подписывается на класс системного монитора, который возвращает наблюдаемый (интервал), который содержит все данные для 9 метров. Это будет выдвигать каждую секунду новый результат в представление (так что не опросить !!!). Эта наблюдаемая использует плоскую карту для одновременной (асинхронной!) Выборки данных из 9 различных источников и архивирования результата в новую модель, которую ваше представление получит в onNext ().

Как, черт возьми, ты собираешься делать это с фьючерсами, завершениями и т.д ... Удачи! :)

Rx Java решает многие проблемы в программировании для меня и делает его намного проще ...

Преимущества:

  • Statelss !!! (важно упомянуть, может быть, самое важное)
  • Управление потоками из коробки
  • Построить последовательности, которые имеют свой жизненный цикл
  • Все наблюдаемые, поэтому цепочка легко
  • Меньше кода для записи
  • Одиночная банка на classpath (очень легкий)
  • Сильно одновременно
  • Нет больше обратного вызова ад
  • На основе подписчика (жесткий контракт между потребителем и производителем)
  • Стратегии противодавления (автоматический выключатель и т.п.)
  • Великолепная обработка ошибок и восстановление
  • Очень хорошая документация (мрамор <3)
  • Полный контроль
  • Многое другое ...

Недостатки: - Трудно проверить

Кристоффер
источник
13
~ « Я бы больше не программировал без этой библиотеки. » Итак, RxJava - это конец всех программных проектов?
Игорь Ганапольский
Это полезно, даже если у меня нет потока асинхронных событий?
Мукеш Верма