Сжатие потоков с использованием JDK8 с использованием лямбды (java.util.stream.Streams.zip)

149

В JDK 8 с лямбдой b93 в b93 был класс java.util.stream.Streams.zip, который можно использовать для архивирования потоков (это показано в учебном пособии Dhananjay Nene, посвященном изучению Java8-лямбд. Часть 1 ). Эта функция:

Создает ленивый и последовательный объединенный поток, элементы которого являются результатом объединения элементов двух потоков.

Однако в b98 это исчезло. Infact Streamsкласс даже не доступен в java.util.stream в b98 .

Была ли эта функциональность перемещена, и если да, то как мне сжать потоки сжато, используя b98?

Приложение, которое я имею в виду, находится в Java-реализации Shen , где я заменил функциональность zip в

  • static <T> boolean every(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)
  • static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)

функции с довольно подробным кодом (который не использует функциональность из b98).

artella
источник
3
Ах, только что узнал, что, кажется, был полностью удален: mail.openjdk.java.net/pipermail/lambda-libs-spec-observers/…
artella
«Изучение лямбд Java8. Часть 1» - новая ссылка на эту статью - blog.dhananjaynene.com/2013/02/exploring-java8-lambdas-part-1
Алексей Егоров

Ответы:

77

Мне это тоже нужно было, поэтому я просто взял исходный код из b93 и поместил его в класс «util». Мне пришлось немного изменить его для работы с текущим API.

Для справки вот рабочий код (принимайте его на свой страх и риск ...):

public static<A, B, C> Stream<C> zip(Stream<? extends A> a,
                                     Stream<? extends B> b,
                                     BiFunction<? super A, ? super B, ? extends C> zipper) {
    Objects.requireNonNull(zipper);
    Spliterator<? extends A> aSpliterator = Objects.requireNonNull(a).spliterator();
    Spliterator<? extends B> bSpliterator = Objects.requireNonNull(b).spliterator();

    // Zipping looses DISTINCT and SORTED characteristics
    int characteristics = aSpliterator.characteristics() & bSpliterator.characteristics() &
            ~(Spliterator.DISTINCT | Spliterator.SORTED);

    long zipSize = ((characteristics & Spliterator.SIZED) != 0)
            ? Math.min(aSpliterator.getExactSizeIfKnown(), bSpliterator.getExactSizeIfKnown())
            : -1;

    Iterator<A> aIterator = Spliterators.iterator(aSpliterator);
    Iterator<B> bIterator = Spliterators.iterator(bSpliterator);
    Iterator<C> cIterator = new Iterator<C>() {
        @Override
        public boolean hasNext() {
            return aIterator.hasNext() && bIterator.hasNext();
        }

        @Override
        public C next() {
            return zipper.apply(aIterator.next(), bIterator.next());
        }
    };

    Spliterator<C> split = Spliterators.spliterator(cIterator, zipSize, characteristics);
    return (a.isParallel() || b.isParallel())
           ? StreamSupport.stream(split, true)
           : StreamSupport.stream(split, false);
}
Сики
источник
1
Разве результирующий поток не должен быть, SIZEDесли какой-либо поток SIZED, а не оба?
Дидье Л
5
Я так не думаю. Оба потока должны быть SIZEDдля этой реализации, чтобы работать. Это на самом деле зависит от того, как вы определяете молнию. Например, сможете ли вы сжать два потока разного размера? Как будет выглядеть результирующий поток? Я считаю, что именно поэтому эта функция была фактически исключена из API. Есть много способов сделать это, и пользователь должен решить, какое поведение должно быть «правильным». Вы бы отбросили элементы из более длинного потока или добавили более короткий список? Если да, то с какими значениями?
Сики
Если я что-то не упустил, нет необходимости в каком-либо приведении (например, к Spliterator<A>).
jub0bs
Существует ли веб-сайт, на котором размещен исходный код Java 8 b93? У меня проблемы с поиском.
Starwarswii
42

zip - одна из функций, предоставляемых библиотекой protonpack .

Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB  = Stream.of("Apple", "Banana", "Carrot", "Doughnut");

List<String> zipped = StreamUtils.zip(streamA,
                                      streamB,
                                      (a, b) -> a + " is for " + b)
                                 .collect(Collectors.toList());

assertThat(zipped,
           contains("A is for Apple", "B is for Banana", "C is for Carrot"));
Доминик Фокс
источник
34

Если у вас есть Guava в вашем проекте, вы можете использовать метод Streams.zip (был добавлен в Guava 21):

Возвращает поток, в котором каждый элемент является результатом передачи соответствующего элемента каждого из streamA и streamB в функцию. Результирующий поток будет иметь длину, меньшую из двух входных потоков; если один поток длиннее, его дополнительные элементы будут игнорироваться. Результирующий поток не может быть эффективно разделен. Это может повредить параллельной производительности.

 public class Streams {
     ...

     public static <A, B, R> Stream<R> zip(Stream<A> streamA,
             Stream<B> streamB, BiFunction<? super A, ? super B, R> function) {
         ...
     }
 }
ZhekaKozlov
источник
26

Архивирование два потока с использованием JDK8 с лямбда ( сутью ).

public static <A, B, C> Stream<C> zip(Stream<A> streamA, Stream<B> streamB, BiFunction<A, B, C> zipper) {
    final Iterator<A> iteratorA = streamA.iterator();
    final Iterator<B> iteratorB = streamB.iterator();
    final Iterator<C> iteratorC = new Iterator<C>() {
        @Override
        public boolean hasNext() {
            return iteratorA.hasNext() && iteratorB.hasNext();
        }

        @Override
        public C next() {
            return zipper.apply(iteratorA.next(), iteratorB.next());
        }
    };
    final boolean parallel = streamA.isParallel() || streamB.isParallel();
    return iteratorToFiniteStream(iteratorC, parallel);
}

public static <T> Stream<T> iteratorToFiniteStream(Iterator<T> iterator, boolean parallel) {
    final Iterable<T> iterable = () -> iterator;
    return StreamSupport.stream(iterable.spliterator(), parallel);
}
Кароль Круль
источник
2
Хорошее решение и (относительно) компактный! Требуется, чтобы вы положили import java.util.function.*;и import java.util.stream.*;в верхней части вашего файла.
SCPC
Обратите внимание, что это терминальная операция в потоке. Это означает, что для бесконечных потоков этот метод ломается
smac89
2
Столько бесполезных обёрток: здесь () -> iteratorи здесь снова iterable.spliterator(). Почему бы не реализовать непосредственно, Spliteratorа не Iterator? Проверьте @Doradus ответ stackoverflow.com/a/46230233/1140754
Мигель Гамбоа
20

Поскольку я не могу представить себе какое-либо использование архивирования на коллекциях, кроме индексированных (списков), и я большой поклонник простоты, это было бы моим решением:

<A,B,C>  Stream<C> zipped(List<A> lista, List<B> listb, BiFunction<A,B,C> zipper){
     int shortestLength = Math.min(lista.size(),listb.size());
     return IntStream.range(0,shortestLength).mapToObj( i -> {
          return zipper.apply(lista.get(i), listb.get(i));
     });        
}
Рафаэль
источник
1
Я думаю, что mapToObjectдолжно быть mapToObj.
seanf
если список не RandomAccess(например, в связанных списках), это будет очень медленно
avmohan
Определенно. Но большинство разработчиков Java хорошо знают, что LinkedList имеет низкую производительность для операций доступа к индексу.
Рафаэль
11

Методы упомянутого вами класса были перенесены в сам Streamинтерфейс в пользу методов по умолчанию. Но похоже, что zipметод был удален. Возможно, потому что не ясно, как должно быть поведение по умолчанию для потоков разных размеров. Но реализация желаемого поведения проста:

static <T> boolean every(
  Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) {
    Iterator<T> it=c2.iterator();
    return c1.stream().allMatch(x->!it.hasNext()||pred.test(x, it.next()));
}
static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) {
    Iterator<T> it=c2.iterator();
    return c1.stream().filter(x->it.hasNext()&&pred.test(x, it.next()))
      .findFirst().orElse(null);
}
Holger
источник
Разве predicateвы не передали фильтр с состоянием ? Это нарушает контракт метода и особенно не будет работать при параллельной обработке потока.
Андреас
2
@Andreas: ни одно из решений не поддерживает параллельную обработку. Поскольку мои методы не возвращают поток, они удостоверяются, что потоки не работают параллельно. Точно так же код принятого ответа возвращает поток, который можно превратить в параллель, но на самом деле ничего не будет делать параллельно. Тем не менее, statefull предикаты не рекомендуется, но не нарушают контракт. Они могут даже использоваться в параллельном контексте, если вы убедитесь, что обновление состояния является поточно-ориентированным. В некоторых случаях они неизбежны, например , превращение потока в отличие является statefull предикат сам по себе .
Хольгер
2
@Andreas: вы можете догадаться, почему эти операции были удалены из Java API…
Хольгер,
8

Я смиренно предлагаю эту реализацию. Результирующий поток усекается до более короткого из двух входных потоков.

public static <L, R, T> Stream<T> zip(Stream<L> leftStream, Stream<R> rightStream, BiFunction<L, R, T> combiner) {
    Spliterator<L> lefts = leftStream.spliterator();
    Spliterator<R> rights = rightStream.spliterator();
    return StreamSupport.stream(new AbstractSpliterator<T>(Long.min(lefts.estimateSize(), rights.estimateSize()), lefts.characteristics() & rights.characteristics()) {
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            return lefts.tryAdvance(left->rights.tryAdvance(right->action.accept(combiner.apply(left, right))));
        }
    }, leftStream.isParallel() || rightStream.isParallel());
}
Doradus
источник
Мне нравится ваше предложение. Но я не совсем согласен с последним .., leftStream.isParallel() || rightStream.isParallel(). Я думаю, что это не имеет никакого эффекта, потому что AbstractSpliteratorпредлагает ограниченный параллелизм по умолчанию. Поэтому я думаю, что окончательный результат будет таким же, как и при прохождении false.
Мигель Гамбоа
@MiguelGamboa - спасибо за ваш комментарий. Я не уверен, что вы подразумеваете под "ограниченным параллелизмом по умолчанию" - у вас есть ссылка на некоторые документы?
Дорадус
6

Библиотека Lazy-Seq обеспечивает функциональность zip.

https://github.com/nurkiewicz/LazySeq

Эта библиотека вдохновлена scala.collection.immutable.Streamи нацелена на обеспечение неизменной, поточно-ориентированной и простой в использовании реализации отложенных последовательностей, возможно, бесконечной.

Ник Сидеракис
источник
5

Используя последнюю версию библиотеки Guava (для Streamsкласса), вы должны уметь

final Map<String, String> result = 
    Streams.zip(
        collection1.stream(), 
        collection2.stream(), 
        AbstractMap.SimpleEntry::new)
    .collect(Collectors.toMap(e -> e.getKey(), e  -> e.getValue()));
Дэн Борза
источник
2

Будет ли это работать для вас? Это короткая функция, которая лениво оценивает потоки, которые она упаковывает, так что вы можете снабдить ее бесконечными потоками (ей не нужно брать размер потоков, которые будут сжаты).

Если потоки конечны, он останавливается, как только в одном из потоков заканчиваются элементы.

import java.util.Objects;
import java.util.function.BiFunction;
import java.util.stream.Stream;

class StreamUtils {
    static <ARG1, ARG2, RESULT> Stream<RESULT> zip(
            Stream<ARG1> s1,
            Stream<ARG2> s2,
            BiFunction<ARG1, ARG2, RESULT> combiner) {
        final var i2 = s2.iterator();
        return s1.map(x1 -> i2.hasNext() ? combiner.apply(x1, i2.next()) : null)
                .takeWhile(Objects::nonNull);
    }
}

Вот некоторый код модульного теста (намного длиннее, чем сам код!)

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;

class StreamUtilsTest {
    @ParameterizedTest
    @MethodSource("shouldZipTestCases")
    <ARG1, ARG2, RESULT>
    void shouldZip(
            String testName,
            Stream<ARG1> s1,
            Stream<ARG2> s2,
            BiFunction<ARG1, ARG2, RESULT> combiner,
            Stream<RESULT> expected) {
        var actual = StreamUtils.zip(s1, s2, combiner);

        assertEquals(
                expected.collect(Collectors.toList()),
                actual.collect(Collectors.toList()),
                testName);
    }

    private static Stream<Arguments> shouldZipTestCases() {
        return Stream.of(
                Arguments.of(
                        "Two empty streams",
                        Stream.empty(),
                        Stream.empty(),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "One singleton and one empty stream",
                        Stream.of(1),
                        Stream.empty(),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "One empty and one singleton stream",
                        Stream.empty(),
                        Stream.of(1),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "Two singleton streams",
                        Stream.of("blah"),
                        Stream.of(1),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("blah", 1))),
                Arguments.of(
                        "One singleton, one multiple stream",
                        Stream.of("blob"),
                        Stream.of(2, 3),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("blob", 2))),
                Arguments.of(
                        "One multiple, one singleton stream",
                        Stream.of("foo", "bar"),
                        Stream.of(4),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("foo", 4))),
                Arguments.of(
                        "Two multiple streams",
                        Stream.of("nine", "eleven"),
                        Stream.of(10, 12),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("nine", 10), pair("eleven", 12)))
        );
    }

    private static List<Object> pair(Object o1, Object o2) {
        return List.of(o1, o2);
    }

    static private <T1, T2> List<Object> combine(T1 o1, T2 o2) {
        return List.of(o1, o2);
    }

    @Test
    void shouldLazilyEvaluateInZip() {
        final var a = new AtomicInteger();
        final var b = new AtomicInteger();
        final var zipped = StreamUtils.zip(
                Stream.generate(a::incrementAndGet),
                Stream.generate(b::decrementAndGet),
                (xa, xb) -> xb + 3 * xa);

        assertEquals(0, a.get(), "Should not have evaluated a at start");
        assertEquals(0, b.get(), "Should not have evaluated b at start");

        final var takeTwo = zipped.limit(2);

        assertEquals(0, a.get(), "Should not have evaluated a at take");
        assertEquals(0, b.get(), "Should not have evaluated b at take");

        final var list = takeTwo.collect(Collectors.toList());

        assertEquals(2, a.get(), "Should have evaluated a after collect");
        assertEquals(-2, b.get(), "Should have evaluated b after collect");
        assertEquals(List.of(2, 4), list);
    }
}
Dominic
источник
В takeWhileконце я должен был опустить то, чего нет в java8, но это не проблема, так как вызываемый абонент может отфильтровывать любые нули, возникающие, когда сжатые потоки имеют разный размер. я думаю, что этот ответ должен быть ответом номер 1, так как он состоит и понятен. отличная работа, спасибо еще раз.
simbo1905
1
public class Tuple<S,T> {
    private final S object1;
    private final T object2;

    public Tuple(S object1, T object2) {
        this.object1 = object1;
        this.object2 = object2;
    }

    public S getObject1() {
        return object1;
    }

    public T getObject2() {
        return object2;
    }
}


public class StreamUtils {

    private StreamUtils() {
    }

    public static <T> Stream<Tuple<Integer,T>> zipWithIndex(Stream<T> stream) {
        Stream<Integer> integerStream = IntStream.range(0, Integer.MAX_VALUE).boxed();
        Iterator<Integer> integerIterator = integerStream.iterator();
        return stream.map(x -> new Tuple<>(integerIterator.next(), x));
    }
}
robby_pelssers
источник
1

Циклоп-реакция AOL , в которую я участвую, также обеспечивает функциональность архивирования, как с помощью расширенной реализации Stream , которая также реализует интерфейс реактивных потоков ReactiveSeq, так и с помощью StreamUtils, которая предлагает большую часть тех же функций через статические методы для стандартных потоков Java.

 List<Tuple2<Integer,Integer>> list =  ReactiveSeq.of(1,2,3,4,5,6)
                                                  .zip(Stream.of(100,200,300,400));


  List<Tuple2<Integer,Integer>> list = StreamUtils.zip(Stream.of(1,2,3,4,5,6),
                                                  Stream.of(100,200,300,400));

Он также предлагает более обобщенную молнию на основе Applicative. Например

   ReactiveSeq.of("a","b","c")
              .ap3(this::concat)
              .ap(of("1","2","3"))
              .ap(of(".","?","!"))
              .toList();

   //List("a1.","b2?","c3!");

   private String concat(String a, String b, String c){
    return a+b+c;
   }

И даже возможность сопряжения каждого элемента в одном потоке с каждым элементом в другом

   ReactiveSeq.of("a","b","c")
              .forEach2(str->Stream.of(str+"!","2"), a->b->a+"_"+b);

   //ReactiveSeq("a_a!","a_2","b_b!","b_2","c_c!","c2")
Джон МакКлин
источник
0

Если это кому-то еще нужно, StreamEx.zipWithв библиотеке streamex есть функция :

StreamEx<String> givenNames = StreamEx.of("Leo", "Fyodor")
StreamEx<String> familyNames = StreamEx.of("Tolstoy", "Dostoevsky")
StreamEx<String> fullNames = givenNames.zipWith(familyNames, (gn, fn) -> gn + " " + fn);

fullNames.forEach(System.out::println);  // prints: "Leo Tolstoy\nFyodor Dostoevsky\n"
const.grigoryev
источник
-1

Это круто. Я должен был сжать два потока в карту с одним потоком является ключом, а другой является значением

Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB  = Stream.of("Apple", "Banana", "Carrot", "Doughnut");    
final Stream<Map.Entry<String, String>> s = StreamUtils.zip(streamA,
                    streamB,
                    (a, b) -> {
                        final Map.Entry<String, String> entry = new AbstractMap.SimpleEntry<String, String>(a, b);
                        return entry;
                    });

System.out.println(s.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));

Выход: {A = яблоко, B = банан, C = морковь}

Гнана
источник