Внедрение эффективного разделителя перетасовки для Java Stream API

Сортировка экземпляра Stream проста и включает всего один вызов метода API — добиться обратного не так просто.

В этой статье мы увидим, как перетасовывать Stream в Java — жадно и лениво, используя фабрики Stream Collectors и пользовательские Spliterators.

Нетерпеливый коллекционер в случайном порядке

Одно из наиболее прагматичных решений вышеуказанной проблемы уже было описано Хайнцем. в этой статье.

В основном, это включает в себя инкапсуляцию составной операции сбора всего потока в список, коллекций#перетасовки его и преобразования в поток:

public static <T> Collector<T, ?, Stream<T>> toEagerShuffledStream() {
    return Collectors.collectingAndThen(
      toList(),
      list -> {
          Collections.shuffle(list);
          return list.stream();
      });
}

Это решение будет оптимальным, если мы хотим обработать все элементы потока в случайном порядке, но оно может дать отпор, если мы хотим обработать только небольшое их подмножество — все потому, что вся коллекция заранее перемешивается, даже если мы запрашиваем только одиночный элемент.

Давайте посмотрим на простой тест и результаты, которые он сгенерировал:

@State(Scope.Benchmark)
public class RandomSpliteratorBenchmark {

    private List<String> source;

    @Param({"1", "10", "100", "1000", "10000", "10000"})
    public int limit;

    @Param({"100000"})
    public int size;

    @Setup(Level.Iteration)
    public void setUp() {
        source = IntStream.range(0, size)
          .boxed()
          .map(Object::toString)
          .collect(Collectors.toList());
    }

    @Benchmark
    public List<String> eager() {
        return source.stream()
          .collect(toEagerShuffledStream())
          .limit(limit)
          .collect(Collectors.toList());
    }
            (limit)   Mode  Cnt     Score     Error  Units
eager             1  thrpt    5   467.796 ±   9.074  ops/s
eager            10  thrpt    5   467.694 ±  17.166  ops/s
eager           100  thrpt    5   459.765 ±   8.048  ops/s
eager          1000  thrpt    5   467.934 ±  43.095  ops/s
eager         10000  thrpt    5   449.471 ±   5.549  ops/s
eager        100000  thrpt    5   331.111 ±   5.626  ops/s

Как мы видим, он довольно хорошо масштабируется вместе с количеством элементов, потребляемых из результирующего Stream, жаль, что абсолютное значение не так впечатляет для относительно небольших чисел — в таких ситуациях предварительное перемешивание всей коллекции оказывается довольно расточительным. .

Давайте посмотрим, что мы можем с этим сделать.

Ленивый сборщик в случайном порядке

Чтобы сэкономить драгоценные циклы ЦП, вместо предварительной перетасовки всей коллекции мы можем просто получить количество элементов, соответствующее запросу восходящего потока.

Для этого нам нужно реализовать собственный Spliterator, который позволит нам перебирать объекты в случайном порядке, а затем мы сможем создать экземпляр Stream с помощью вспомогательного метода из класса StreamSupport:

public class RandomSpliterator<T> implements Spliterator<T> {

    // ...

    public static <T> Collector<T, ?, Stream<T>> toLazyShuffledStream() {
        return Collectors.collectingAndThen(
          toList(),
          list -> StreamSupport.stream(
            new ShuffledSpliterator<>(list), false));
    }
}

Мы не можем избежать оценки всего потока, даже если хотим выбрать один случайный элемент (что означает отсутствие поддержки бесконечных последовательностей), поэтому совершенно нормально инициировать наш RandomSpliterator с помощью List, но есть ловить…

Если конкретная реализация List не поддерживает произвольный доступ с постоянным временем, это решение может оказаться намного медленнее, чем нетерпеливый подход. Чтобы защитить себя от этого сценария, мы можем выполнить простую проверку при создании экземпляра Spliterator:

private RandomSpliterator(
  List<T> source, Supplier<? extends Random> random) {
    if (source.isEmpty()) { ... } // throw
    this.source = source instanceof RandomAccess 
      ? source 
      : new ArrayList<>(source);
    this.random = random.get();
}

Создание нового экземпляра ArrayList является дорогостоящим, но незначительным по сравнению с затратами, порождаемыми реализациями, которые не обеспечивают произвольный доступ O(1).

И теперь мы можем переопределить самый важный метод — tryAdvance().

В данном случае это довольно просто — на каждой итерации нам нужно случайным образом выбрать и удалить случайный элемент из исходной коллекции.

Мы можем не беспокоиться об изменении исходного кода, так как мы не публикуем RandomSpliterator, а только основанный на нем Collector:

@Override
public boolean tryAdvance(Consumer<? super T> action) {
    int remaining = source.size();
    action.accept(source.remove(random.nextInt(remaining)));
    return remaining - 1 > 0;
}

Помимо этого, нам нужно реализовать еще три метода:

@Override
public Spliterator<T> trySplit() {
    return null; // to indicate that split is not possible
}

@Override
public long estimateSize() {
    return source.size();
}

@Override
public int characteristics() {
    return SIZED;
}

А теперь попробуем и увидим, что это действительно работает:

IntStream.range(0, 10).boxed()
  .collect(toLazyShuffledStream())
  .forEach(System.out::println);

И результат:

3
4
8
1
7
6
5
0
2
9

Вопросы производительности

В этой реализации мы заменили N свопов элементов массива M поисками/удалениями, где:

N — размер коллекции
M — количество выбранных предметов
Как правило, один поиск/удаление из ArrayList является более дорогостоящей операцией, чем замена одного элемента, что делает это решение не таким масштабируемым, но значительно более производительным при относительно низких значениях M.

Давайте теперь посмотрим, как это решение соотносится с нетерпеливым подходом, продемонстрированным в начале (оба рассчитаны для коллекции, содержащей 100_000 объектов):

            (limit)   Mode  Cnt     Score     Error  Units
eager             1  thrpt    5   467.796 ±   9.074  ops/s
eager            10  thrpt    5   467.694 ±  17.166  ops/s
eager           100  thrpt    5   459.765 ±   8.048  ops/s
eager          1000  thrpt    5   467.934 ±  43.095  ops/s
eager         10000  thrpt    5   449.471 ±   5.549  ops/s
eager        100000  thrpt    5   331.111 ±   5.626  ops/s
lazy              1  thrpt    5  1530.763 ±  72.096  ops/s
lazy             10  thrpt    5  1462.305 ±  23.860  ops/s
lazy            100  thrpt    5   823.212 ± 119.771  ops/s
lazy           1000  thrpt    5   166.786 ±  16.306  ops/s
lazy          10000  thrpt    5    19.475 ±   4.052  ops/s
lazy         100000  thrpt    5     4.097 ±   0.416  ops/s

диаграмма-6.png

Как мы видим, это решение превосходит предыдущее, если количество обработанных элементов Stream относительно невелико, но по мере увеличения отношения обработано/размер_коллекции пропускная способность резко падает.

Это все из-за дополнительных накладных расходов, возникающих при удалении элементов из ArrayList, содержащих оставшиеся объекты — каждое удаление требует сдвига внутреннего массива на единицу с использованием относительно дорогого метода System#arraycopy.

Мы можем заметить аналогичную закономерность для гораздо больших коллекций (1_000_000 элементов):

      (limit)    (size)   Mode  Cnt  Score   Err  Units
eager       1  10000000  thrpt    5  0.915        ops/s
eager      10  10000000  thrpt    5  0.783        ops/s
eager     100  10000000  thrpt    5  0.965        ops/s
eager    1000  10000000  thrpt    5  0.936        ops/s
eager   10000  10000000  thrpt    5  0.860        ops/s
lazy        1  10000000  thrpt    5  4.338        ops/s
lazy       10  10000000  thrpt    5  3.149        ops/s
lazy      100  10000000  thrpt    5  2.060        ops/s
lazy     1000  10000000  thrpt    5  0.370        ops/s
lazy    10000  10000000  thrpt    5  0.05         ops/s

диаграмма-2-2.png

…и намного меньше (128 элементов, помните о масштабе!):

   (limit)    (size)   Mode  Cnt       Score   Error  Units
eager        2     128    thrpt    5  246439.459          ops/s
eager        4     128    thrpt    5  333866.936          ops/s
eager        8     128    thrpt    5  340296.188          ops/s
eager       16     128    thrpt    5  345533.673          ops/s
eager       32     128    thrpt    5  231725.156          ops/s
eager       64     128    thrpt    5  314324.265          ops/s
eager      128     128    thrpt    5  270451.992          ops/s
lazy         2     128    thrpt    5  765989.718          ops/s
lazy         4     128    thrpt    5  659421.041          ops/s
lazy         8     128    thrpt    5  652685.515          ops/s
lazy        16     128    thrpt    5  470346.570          ops/s
lazy        32     128    thrpt    5  324174.691          ops/s
lazy        64     128    thrpt    5  186472.090          ops/s
lazy       128     128    thrpt    5  108105.699          ops/s

диаграмма-3-2.png

Но можем ли мы сделать лучше, чем это?

Дальнейшие улучшения производительности

К сожалению, масштабируемость существующего решения весьма разочаровывает. Давайте попробуем улучшить его, но прежде чем мы это сделаем, мы должны сначала измерить:

Скриншот-2019-01-03-at-16.36.58.png

Как и ожидалось, Arraylist#remove оказывается одной из горячих точек — другими словами, ЦП тратит заметное количество времени на удаление элементов из ArrayList.

Почему это? Удаление из ArrayList включает удаление элемента из базового массива. Загвоздка в том, что массивы в Java не могут быть изменены — каждое удаление вызывает создание нового массива меньшего размера:

private void fastRemove(Object[] es, int i) {
    modCount++;
    final int newSize;
    if ((newSize = size - 1) > i)
        System.arraycopy(es, i + 1, es, i, newSize - i);
    es[size = newSize] = null;
}

Что мы можем с этим сделать? Избегайте удаления элементов из ArrayList.

Для этого мы могли бы хранить оставшиеся элементы в массиве и отслеживать его размер отдельно:

public class ImprovedRandomSpliterator<T> implements Spliterator<T> {

    private final Random random;
    private final T[] source;
    private int size;

    private ImprovedRandomSpliterator(
      List<T> source, Supplier<? extends Random> random) {
        if (source.isEmpty()) {
            throw new IllegalArgumentException(...);
        }
        this.source = (T[]) source.toArray();
        this.random = random.get();
        this.size = this.source.length;
    }
}

К счастью, мы можем избежать проблем с параллелизмом, поскольку экземпляры этого Spliterator не должны использоваться совместно между потоками.

И теперь всякий раз, когда мы пытаемся удалить элемент, нам не нужно фактически создавать новый уменьшенный массив. Вместо этого мы можем уменьшить размер нашего трекера и проигнорировать оставшуюся часть массива.

Но прямо перед этим нам нужно поменять местами последний элемент с возвращаемым элементом:

@Override
public boolean tryAdvance(Consumer<? super T> action) {
    int nextIdx = random.nextInt(size);
    int lastIdx = size - 1;

    action.accept(source[nextIdx]);
    source[nextIdx] = source[lastIdx];
    source[lastIdx] = null; // let object be GCed
    return --size > 0;
}

Если мы профилируем его сейчас, мы увидим, что дорогостоящий вызов исчез:
Скриншот-2019-01-03-at-16.38.47.png

Мы готовы повторно запустить тесты и сравнить:

               (limit)  (size)   Mode  Cnt     Score     Error  Units
eager                1  100000  thrpt    3   456.811 ±  20.585  ops/s
eager               10  100000  thrpt    3   469.635 ±  23.281  ops/s
eager              100  100000  thrpt    3   466.486 ±  68.820  ops/s
eager             1000  100000  thrpt    3   454.459 ±  13.103  ops/s
eager            10000  100000  thrpt    3   443.640 ±  96.929  ops/s
eager           100000  100000  thrpt    3   335.134 ±  21.944  ops/s
lazy                 1  100000  thrpt    3  1587.536 ± 389.128  ops/s
lazy                10  100000  thrpt    3  1452.855 ± 406.879  ops/s
lazy               100  100000  thrpt    3   814.978 ± 242.077  ops/s
lazy              1000  100000  thrpt    3   167.825 ± 129.559  ops/s
lazy             10000  100000  thrpt    3    19.782 ±   8.596  ops/s
lazy            100000  100000  thrpt    3     3.970 ±   0.408  ops/s
lazy_improved        1  100000  thrpt    3  1509.264 ± 170.423  ops/s
lazy_improved       10  100000  thrpt    3  1512.150 ± 143.927  ops/s
lazy_improved      100  100000  thrpt    3  1463.093 ± 593.370  ops/s
lazy_improved     1000  100000  thrpt    3  1451.007 ±  58.948  ops/s
lazy_improved    10000  100000  thrpt    3  1148.581 ± 232.218  ops/s
lazy_improved   100000  100000  thrpt    3   383.022 ±  97.082  ops/s

диаграмма-5-2.png

Как видите, мы получили реализацию, которая более устойчива с точки зрения производительности к количеству элементов, к которым мы стремимся.

На самом деле улучшенная реализация работает немного лучше, чем реализация на основе Collections#shuffle даже в пессимистическом сценарии!

Полный пример
…может быть и найдено на GitHub.

package com.pivovarit.stream;

import java.util.List;
import java.util.Random;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class ImprovedRandomSpliterator<T> implements Spliterator<T> {

    private final Random random;
    private final T[] source;
    private int size;

    ImprovedRandomSpliterator(List<T> source, Supplier<? extends Random> random) {
        if (source.isEmpty()) {
            throw new IllegalArgumentException("RandomSpliterator can't be initialized with an empty collection");
        }
        this.source = (T[]) source.toArray();
        this.random = random.get();
        this.size = this.source.length;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        int nextIdx = random.nextInt(size);
        int lastIdx = size - 1;

        action.accept(source[nextIdx]);
        source[nextIdx] = source[lastIdx];
        source[lastIdx] = null; // let object be GCed
        return --size > 0;
    }

    @Override
    public Spliterator<T> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return source.length;
    }

    @Override
    public int characteristics() {
        return SIZED;
    }
}

package com.pivovarit.stream;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static java.util.stream.Collectors.toCollection;

public final class RandomCollectors {

    private RandomCollectors() {
    }

    public static <T> Collector<T, ?, Stream<T>> toImprovedLazyShuffledStream() {
        return Collectors.collectingAndThen(
          toCollection(ArrayList::new),
          list -> !list.isEmpty()
            ? StreamSupport.stream(new ImprovedRandomSpliterator<>(list, Random::new), false)
            : Stream.empty());
    }

    public static <T> Collector<T, ?, Stream<T>> toLazyShuffledStream() {
        return Collectors.collectingAndThen(
          toCollection(ArrayList::new),
          list -> !list.isEmpty()
            ? StreamSupport.stream(new RandomSpliterator<>(list, Random::new), false)
            : Stream.empty());
    }

    public static <T> Collector<T, ?, Stream<T>> toEagerShuffledStream() {
        return Collectors.collectingAndThen(
          toCollection(ArrayList::new),
          list -> {
              Collections.shuffle(list);
              return list.stream();
          });
    }
}

Похожие записи

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *