Потоки Java 8 похожи на наблюдаемые RxJava?
Определение потока Java 8:
Классы в новом
java.util.stream
пакете предоставляют Stream API для поддержки операций в функциональном стиле над потоками элементов.
java-8
java-stream
rx-java
observable
rahulrv
источник
источник
Ответы:
TL; DR : Все библиотеки обработки последовательности / потока предлагают очень похожий API для построения конвейера. Различия в API для обработки многопоточности и составления конвейеров.
RxJava довольно сильно отличается от Stream. Из всех вещей JDK наиболее близким к rx.Observable является, возможно, комбо
java.util.stream.CollectorStream + CompletableFuture (которое обходится ценой работы с дополнительным слоем монады, то есть с обработкой преобразования междуStream<CompletableFuture<T>>
иCompletableFuture<Stream<T>>
).Существуют значительные различия между Observable и Stream:
Stream#parallel()
разбивает последовательность на разделыObservable#subscribeOn()
иObservable#observeOn()
не делает; эмулироватьStream#parallel()
поведение с Observable сложно , у него когда-то был.parallel()
метод, но этот метод вызвал столько путаницы, что.parallel()
поддержка была перемещена в отдельный репозиторий на github, RxJavaParallel. Подробнее в другом ответе .Stream#parallel()
не позволяет указывать пул потоков для использования, в отличие от большинства методов RxJava, принимающих необязательный планировщик. Поскольку все экземпляры потока в JVM используют один и тот же пул разветвления, добавление.parallel()
может случайно повлиять на поведение в другом модуле вашей программы.Observable#interval()
,Observable#window()
и многие другие; Это происходит главным образом потому, что потоки основаны на извлечении, и восходящий поток не имеет никакого контроля над тем, когда испускать следующий элемент ниже по потоку.takeWhile()
,takeUntil()
); использование обходного путиStream#anyMatch()
ограничено: это терминальная операция, поэтому вы не можете использовать его более одного раза для каждого потокаПотоки трудно построить самостоятельно, Observable может быть построена многими способамиEDIT: Как было отмечено в комментариях, есть способы построения потока. Однако, поскольку нет терминального короткого замыкания, вы, например, не можете легко сгенерировать Поток строк в файле (хотя JDK предоставляет строки Files # lines и BufferedReader # прямо из коробки, и другими подобными сценариями можно управлять, создавая Stream от итератора).Observable#using()
); вы можете обернуть его потоком ввода-вывода или мьютексом и быть уверенным, что пользователь не забудет освободить ресурс - он будет автоматически удален при прекращении подписки; У Stream естьonClose(Runnable)
метод, но вы должны вызывать его вручную или через try-with-resources. Например Вы должны иметь в виду, что Files # lines () должен быть заключен в блок try-with-resources.Обзор: RxJava значительно отличается от Streams. Реальные альтернативы RxJava - это другие реализации ReactiveStream , например, соответствующая часть Akka.
Обновление . Есть хитрость для использования пула разветвления не по умолчанию
Stream#parallel
, см. Пользовательский пул потоков в параллельном потоке Java 8Обновление . Все вышеперечисленное основано на опыте работы с RxJava 1.x. Теперь, когда RxJava 2.x уже здесь , этот ответ может быть устаревшим.
источник
Stream.generate()
и передать свою собственнуюSupplier<U>
реализацию, всего лишь один простой метод, из которого вы предоставляете следующий элемент в потоке. Есть множество других методов. Чтобы легко построить последовательность,Stream
которая зависит от предыдущих значений, вы можете использоватьinterate()
метод, каждый из которыхCollection
имеетstream()
метод иStream.of()
создаетStream
из varargs или массива. Наконец,StreamSupport
есть поддержка более продвинутого создания потоков с использованием сплитераторов или потоков примитивных типов.takeWhile()
,takeUntil()
);» - У JDK9 есть эти, я полагаю, функции takeWhile () и dropWhile ()Java 8 Stream и RxJava выглядят очень похоже. Они имеют похожие операторы (filter, map, flatMap ...), но не созданы для одного и того же использования.
Вы можете выполнять асинхронные задачи, используя RxJava.
С потоком Java 8 вы будете проходить элементы вашей коллекции.
Вы можете сделать почти то же самое в RxJava (обход элементов коллекции), но, поскольку RxJava сфокусирован на параллельной задаче, ..., он использует синхронизацию, защелку, ... Так что та же задача с использованием RxJava может быть медленнее, чем с потоком Java 8.
С RxJava можно сравнить
CompletableFuture
, но он может вычислять более одного значения.источник
parallelStream
поддерживает аналогичную синхронизацию простых прохождений / карт / фильтрации и т. д.Есть несколько технических и концептуальных различий, например, потоки Java 8 являются одноразовыми, основанными на извлечении, синхронными последовательностями значений, тогда как Наблюдаемые RxJava являются повторно наблюдаемыми, адаптивно основанными на двухтактных, потенциально асинхронных последовательностях значений. RxJava нацелен на Java 6+ и работает на Android.
источник
Потоки Java 8 основаны на потоке. Вы перебираете поток Java 8, потребляющий каждый элемент. И это может быть бесконечный поток.
RXJava
Observable
по умолчанию основан на push. Вы подписываетесь на Observable, и вы получите уведомление, когда прибывает следующий элемент (onNext
), или когда поток завершен (onCompleted
), или когда произошла ошибка (onError
). Потому что сObservable
вы получаетеonNext
,onCompleted
,onError
события, вы можете сделать некоторые мощные функции , такие как объединение различныхObservable
сек на новый (zip
,merge
,concat
). Другие вещи, которые вы могли бы сделать, это кэширование, регулирование ... И он использует более или менее один и тот же API на разных языках (RxJava, RX в C #, RxJS, ...)По умолчанию RxJava является однопоточным. Если вы не начнете использовать Планировщики, все будет происходить в одном потоке.
источник
Существующие ответы являются исчерпывающими и правильными, но для начинающих отсутствует четкий пример. Позвольте мне поставить некоторые конкретные термины, такие как «push / pull-based» и «re-наблюдаемый». Примечание : я ненавижу термин
Observable
(ради всего святого), поэтому просто буду ссылаться на потоки J8 и RX.Рассмотрим список целых чисел,
J8 Stream - это утилита для изменения коллекции. Например, даже цифры могут быть извлечены как,
Это в основном карта Python , фильтрация, уменьшение , очень хорошее (и давно назревшее) дополнение к Java. Но что, если цифры не были собраны раньше времени - что, если цифры были переданы во время работы приложения - мы могли бы отфильтровать четные числа в режиме реального времени.
Представьте, что отдельный процесс потока выводит целые числа в случайное время, пока приложение работает (
---
обозначает время)В RX
even
может реагировать на каждую новую цифру и применять фильтр в режиме реального времениНет необходимости хранить списки ввода и вывода. если ты хотите получить выходной список, нет проблем, это тоже можно стримировать. На самом деле все это поток.
Вот почему такие термины, как «без состояний» и «функционал», больше связаны с RX
источник
RxJava также тесно связан с инициативой реактивных потоков и считает себя простой реализацией API реактивных потоков (например, по сравнению с реализацией потоков Akka ). Основное отличие состоит в том, что реактивные потоки спроектированы так, чтобы они могли справляться с обратным давлением, но если вы заглянете на страницу реактивных потоков, вы поймете идею. Они очень хорошо описывают свои цели, и потоки также тесно связаны с реактивным манифестом .
Потоки Java 8 в значительной степени являются реализацией неограниченной коллекции, очень похожей на Scala Stream или Clojure lazy seq .
источник
Java 8 Streams позволяет эффективно обрабатывать действительно большие коллекции, используя многоядерные архитектуры. Напротив, RxJava является однопоточным по умолчанию (без планировщиков). Поэтому RxJava не будет использовать преимущества многоядерных машин, если вы сами не создадите эту логику.
источник