Как создать несколько потоков для каждого элемента запроса

9

Я пытаюсь обработать код ниже, используя многопоточность на уровне заказа.

List<String> orders = Arrays.asList("order1", "order2", 
                   "order3", "order4", "order1");

Текущее последовательное исполнение:

orders.stream().forEach(order -> {
    rules.forEach(rule -> {
        finalList.add(beanMapper.getBean(rule)
                .applyRule(createTemplate.apply(getMetaData.apply(rule), command),
                           order));
    });
});

Я пытался с помощью:

orders.parallelStream().forEach(order -> {}} // code snippet.

Но это меняет порядок rules.forEach (rule -> {}} .

Например:
Ввод:

 List<String> orders = Arrays.asList("order1", "order2", 
                         "order3", "order4", "order1");
 List<String> rules = Arrays.asList("rule1", "rule2", "rule3");

Ожидаемый результат:

order1 with rule1, rule2, rule3
order2 with rule1, rule2, rule3

Фактический результат с parallelStream():

order1 with rule3, rule1, rule2
order1 with rule2, rule1, rule3

Меня не беспокоит порядок заказов , но меня беспокоит порядок правил . Заказы могут обрабатываться в любом порядке, но правила должны выполняться в том же порядке для каждого заказа.

Пожалуйста помоги.

Майянк Бишт
источник

Ответы:

4

Ты можешь использовать :

orders.stream().parallel().forEachOrdered(// Your rules logic goes here. )

ForEachOrdered гарантирует сохранение порядка потока.

Итак, для вашей справки:

orders.stream().parallel().forEachOrdered( order -> {

            rules.stream().parallel().forEachOrdered ( rule -> {

                 System.out.println( " Order : " + order + " rule :" + rule);
            });

        });

Примечание: хотя мы можем это сделать, за производительностью следует внимательно следить, потому что пареллизм и порядок плохо сочетаются друг с другом!

Вывод

 Order : order1 rule :rule1
 Order : order1 rule :rule2
 Order : order1 rule :rule3
 Order : order2 rule :rule1
 Order : order2 rule :rule2
 Order : order2 rule :rule3
 Order : order3 rule :rule1
 Order : order3 rule :rule2
 Order : order3 rule :rule3
 Order : order4 rule :rule1
 Order : order4 rule :rule2
 Order : order4 rule :rule3
 Order : order1 rule :rule1
 Order : order1 rule :rule2
 Order : order1 rule :rule3
Прамод С. Никам
источник
Спасибо за ответ. forEachOrdered гарантирует порядок потока, но также снижает производительность. Я попробовал это, и приложение занимает время, подобное последовательной обработке. stream () .rallel & forEachOrdered не дополняют друг друга.
Майянк Бишт
Да, я согласен, что мы должны сделать полный анализ задержек, прежде чем делать это.
Прамод С. Никам
Да, я получаю ту же производительность с этим, нет никаких улучшений.
Майянк Бишт
1
Внимательно следите за этой веткой, чтобы получить лучшее решение для этого.
Прамод С. Никам
Могу ли я достичь параллельной обработки, используя ExecutorService?
Майянк Бишт
1

Вы добавляете элементы в finalListразные потоки одновременно. Это вызывает смешивание результатов применения правил к различным заказам (правила не группируются по их заказам).

Вы можете исправить это, создав временный список для каждого, orderа затем синхронно объединить все временные списки в finalList.

Вот как вы можете сделать это с помощью Stream-API (Java 9+):

List<AppliedRule> finalList = orders.parallelStream().map(order ->
        rules.stream().map(rule -> applyRule(order, rule)).collect(Collectors.toList())
).collect(Collectors.flatMapping(Collection::stream, Collectors.toList()));

Примечание: Collectors.flatMapping()здесь используется вместо простого flatMapдля синхронного запуска плоского отображения во время сбора потока.


Аналог Java 8:

List<AppliedRule> finalList = orders.parallelStream().map(order ->
        rules.stream().map(rule -> applyRule(order, rule)).collect(Collectors.toList())
).collect(Collectors.toList())
        .stream()
        .flatMap(Collection::stream)
        .collect(Collectors.toList());
Bananon
источник
Спасибо за ответ. Я попробовал ваш подход, и я получаю java.util.ConcurrentModificationException: null
Mayank Bisht
finalList = orders.parallelStream () .map (order -> rules.stream () .map (rule -> beanMapper.getBean (rule) .applyRule (createTemplate.apply (getMetaData.apply (rule), command), order)) .collect (Collectors.toList ())) Collect (Collectors.toList ()) поток () flatMap (Коллекция :: поток) .collect (Collectors.toList (.))..;
Mayank Bisht
@mayankbisht, это означает, что beanMapper.getBean(rule) .applyRule(createTemplate.apply(getMetaData.apply(rule), command), order)это не чистая функция, поэтому ее нельзя использовать параллельно. Попробуйте удалить все побочные эффекты от него; ConcurrentModificationExceptionтрассировка стека может помочь найти их.
Бананон
0

Будет ли это работать?

final int rulesSize = rules.size();
AtomicInteger atomicInteger = new AtomicInteger(0);
orders.stream().parallel().forEach(order -> {
    IntStream.range(0, rulesSize).parallel().forEach( i -> {
        synchronized (atomicInteger) {
            System.out.println(" Order : " + order + " rule :" + rules.get(atomicInteger.getAndIncrement() % rulesSize));
        }
    });
});

Вывод

 Order : order1 rule :rule1
 Order : order4 rule :rule2
 Order : order1 rule :rule3
 Order : order3 rule :rule1
 Order : order3 rule :rule2
 Order : order3 rule :rule3
 Order : order2 rule :rule1
 Order : order2 rule :rule2
 Order : order2 rule :rule3
 Order : order1 rule :rule1
 Order : order1 rule :rule2
 Order : order4 rule :rule3
 Order : order1 rule :rule1
 Order : order4 rule :rule2
 Order : order1 rule :rule3
Бхавани Сингх
источник
0

Порядок заказов может быть любым, но порядок правил не должен меняться. Также для определенного порядка правило должно прийти в группу.

Если это так, нет места для фактического параллелизма.

когда

order1-rule1
order1-rule2
order2-rule1
order2-rule2

а также

order2-rule1
order2-rule2
order1-rule1
order1-rule2

являются единственными действительными прогонами для 2 заказов и 2 правил,
и

order1-rule1
order2-rule1
order1-rule2
order2-rule2

считается недействительным, это не параллелизм, а просто рандомизация orders, предположительно без усиления. Если вам «надоело» order1приходить первым, вы можете перетасовать список, но это все:

public static void main (String[] args) throws java.lang.Exception
{
    List<String> orders = Arrays.asList("order1", "order2", "order3", "order4");
    List<String> rules = Arrays.asList("rule1", "rule2", "rule3");
    Collections.shuffle(orders);
    orders.forEach(order->{
        rules.forEach(rule->{
            System.out.println(order+"-"+rule);
        });
    });
}

Даже потоковая передача не нужна, просто две вложенные петли. Тест: https://ideone.com/qI3dqd

order2-rule1
order2-rule2
order2-rule3
order4-rule1
order4-rule2
order4-rule3
order1-rule1
order1-rule2
order1-rule3
order3-rule1
order3-rule2
order3-rule3


Оригинальный ответ

Но это меняет порядок rules.forEach (rule -> {}}.

Нет. В orderы могут перекрывать друг друга, но порядок ruleс для каждого заказа сохраняются. Почему непараллельный должен forEachделать что-то еще?

Пример кода:

public static void main (String[] args) throws java.lang.Exception
{
    List<String> orders = Arrays.asList("order1", "order2", "order3", "order4");
    List<String> rules = Arrays.asList("rule1", "rule2", "rule3");
    orders.stream().parallel().forEach(order->{
        rules.forEach(rule->{
            System.out.println(order+"-"+rule);
        });
    });
}

Тест: https://ideone.com/95Cybg
Пример вывода:

order2-rule1
order2-rule2
order2-rule3
order1-rule1
order1-rule2
order1-rule3
order4-rule1
order4-rule2
order4-rule3
order3-rule1
order3-rule2
order3-rule3

Порядок orders смешан, но rules всегда 1-2-3. Я думаю, что ваш вывод просто скрыл пары (на самом деле вы не показали, как это было сгенерировано).

Конечно, это может быть расширено с некоторыми задержками, поэтому обработка orders будет фактически перекрываться:

public static void delay(){
    try{
        Thread.sleep(ThreadLocalRandom.current().nextInt(100,300));
    }catch(Exception ex){}
}

public static void main (String[] args) throws java.lang.Exception
{
    List<String> orders = Arrays.asList("order1", "order2", "order3", "order4");
    List<String> rules = Arrays.asList("rule1", "rule2", "rule3");
    orders.stream().parallel().forEach(order->{
        rules.forEach(rule->{
            delay();
            System.out.println(order+"-"+rule);
        });
    });
}

Тест: https://ideone.com/cSFaqS
Пример вывода:

order3-rule1
order2-rule1
order2-rule2
order3-rule2
order3-rule3
order2-rule3
order1-rule1
order4-rule1
order1-rule2
order4-rule2
order4-rule3
order1-rule3

Это может быть что-то, что вы видели, просто без orderxчасти. С orderвидимыми s можно отследить, что rules продолжают прибывать как 1-2-3, перorder . Кроме того, ваш список примеров содержал order1дважды, что наверняка не помогло увидеть, что происходит.

tevemadar
источник
Спасибо за ответ. Приведенный выше вывод может быть правильным для меньшего количества заказов. Но если вы увеличите заказы, вы получите другой результат. Например, (order4-rule1, order4-rule2, order4-rule1) (order1-rule1, order1-rule2) (order3-rule1, order3-rule2) (order4-rule1, order4-rule2, order4-rule1, order4-rule2).
Mayank Bisht
Порядок заказов может быть любым, но порядок правил не должен меняться. Также для определенного порядка правило должно прийти в группу. Например (order1- правило 1, order1-rule2, order1-rule3) и нет (order1-rule1, order2-rule1, order1-rule2, order1-rule3).)
майянский Биш,
@mayankbisht Я думаю, что эти ограничения просто не позволяют параллельную обработку. Смотрите обновленный ответ (я написал новую часть в начале).
tevemadar
Да, я понимаю это, и именно поэтому я разместил этот вопрос здесь. Я подумал, что, может быть, найдется другой способ сделать это, или, может быть, мы можем изменить
алгоритм
@mayankbisht Вы могли бы описать, почему orders не может перекрываться (возможно, они ruleс состоянием и существуют в ограниченном количестве копий, возможно, только в одной?). Но, как правило, нет параллелизма без параллельных вещей, в этом и заключается весь смысл параллелизма.
tevemadar
0

Если вы не против попробовать стороннюю библиотеку. Вот пример с моей библиотекой: abacus-util

StreamEx.of(orders).parallelStream().forEach(order -> {}}

И вы даже можете указать номер потока:

StreamEx.of(orders).parallelStream(maxThreadNum).forEach(order -> {}}

Порядок ruleбудет сохранен.

Кстати, поскольку он находится в параллельном потоке, фрагмент кода, ...finalList.add(...скорее всего, не будет работать. Я думаю, что лучше собрать результат в список:

StreamEx.of(orders).parallelStream().map/flatMap(order -> {...}}.toList()

это также выполнимо, даже если вы хотите сохранить порядок orderпо какой-то причине позже:

StreamEx.of(orders).indexed().parallelStream()
      .map/flatMap(order -> {...}}.sortedBy(...index).toList()
user_3380739
источник