Внедрение эффективного разделителя перетасовки для Java Stream API
Сортировка экземпляра Stream проста и включает всего один вызов метода API — добиться обратного не так просто.
В этой статье мы увидим, как перетасовывать Stream в Java — жадно и лениво, используя фабрики Stream Collectors и пользовательские Spliterators.
Нетерпеливый коллекционер в случайном порядке
Одно из наиболее прагматичных решений вышеуказанной проблемы уже было описано Хайнцем. в этой статье.
В основном, это включает в себя инкапсуляцию составной операции сбора всего потока в список, коллекций#перетасовки его и преобразования в поток:
public static <T> Collector<T, ?, Stream<T>> toEagerShuffledStream() {
return Collectors.collectingAndThen(
toList(),
list -> {
Collections.shuffle(list);
return list.stream();
});
}
Это решение будет оптимальным, если мы хотим обработать все элементы потока в случайном порядке, но оно может дать отпор, если мы хотим обработать только небольшое их подмножество — все потому, что вся коллекция заранее перемешивается, даже если мы запрашиваем только одиночный элемент.
Давайте посмотрим на простой тест и результаты, которые он сгенерировал:
@State(Scope.Benchmark)
public class RandomSpliteratorBenchmark {
private List<String> source;
@Param({"1", "10", "100", "1000", "10000", "10000"})
public int limit;
@Param({"100000"})
public int size;
@Setup(Level.Iteration)
public void setUp() {
source = IntStream.range(0, size)
.boxed()
.map(Object::toString)
.collect(Collectors.toList());
}
@Benchmark
public List<String> eager() {
return source.stream()
.collect(toEagerShuffledStream())
.limit(limit)
.collect(Collectors.toList());
}
(limit) Mode Cnt Score Error Units
eager 1 thrpt 5 467.796 ± 9.074 ops/s
eager 10 thrpt 5 467.694 ± 17.166 ops/s
eager 100 thrpt 5 459.765 ± 8.048 ops/s
eager 1000 thrpt 5 467.934 ± 43.095 ops/s
eager 10000 thrpt 5 449.471 ± 5.549 ops/s
eager 100000 thrpt 5 331.111 ± 5.626 ops/s
Как мы видим, он довольно хорошо масштабируется вместе с количеством элементов, потребляемых из результирующего Stream, жаль, что абсолютное значение не так впечатляет для относительно небольших чисел — в таких ситуациях предварительное перемешивание всей коллекции оказывается довольно расточительным. .
Давайте посмотрим, что мы можем с этим сделать.
Ленивый сборщик в случайном порядке
Чтобы сэкономить драгоценные циклы ЦП, вместо предварительной перетасовки всей коллекции мы можем просто получить количество элементов, соответствующее запросу восходящего потока.
Для этого нам нужно реализовать собственный Spliterator, который позволит нам перебирать объекты в случайном порядке, а затем мы сможем создать экземпляр Stream с помощью вспомогательного метода из класса StreamSupport:
public class RandomSpliterator<T> implements Spliterator<T> {
// ...
public static <T> Collector<T, ?, Stream<T>> toLazyShuffledStream() {
return Collectors.collectingAndThen(
toList(),
list -> StreamSupport.stream(
new ShuffledSpliterator<>(list), false));
}
}
Мы не можем избежать оценки всего потока, даже если хотим выбрать один случайный элемент (что означает отсутствие поддержки бесконечных последовательностей), поэтому совершенно нормально инициировать наш RandomSpliterator
Если конкретная реализация List не поддерживает произвольный доступ с постоянным временем, это решение может оказаться намного медленнее, чем нетерпеливый подход. Чтобы защитить себя от этого сценария, мы можем выполнить простую проверку при создании экземпляра Spliterator:
private RandomSpliterator(
List<T> source, Supplier<? extends Random> random) {
if (source.isEmpty()) { ... } // throw
this.source = source instanceof RandomAccess
? source
: new ArrayList<>(source);
this.random = random.get();
}
Создание нового экземпляра ArrayList является дорогостоящим, но незначительным по сравнению с затратами, порождаемыми реализациями, которые не обеспечивают произвольный доступ O(1).
И теперь мы можем переопределить самый важный метод — tryAdvance().
В данном случае это довольно просто — на каждой итерации нам нужно случайным образом выбрать и удалить случайный элемент из исходной коллекции.
Мы можем не беспокоиться об изменении исходного кода, так как мы не публикуем RandomSpliterator, а только основанный на нем Collector:
@Override
public boolean tryAdvance(Consumer<? super T> action) {
int remaining = source.size();
action.accept(source.remove(random.nextInt(remaining)));
return remaining - 1 > 0;
}
Помимо этого, нам нужно реализовать еще три метода:
@Override
public Spliterator<T> trySplit() {
return null; // to indicate that split is not possible
}
@Override
public long estimateSize() {
return source.size();
}
@Override
public int characteristics() {
return SIZED;
}
А теперь попробуем и увидим, что это действительно работает:
IntStream.range(0, 10).boxed()
.collect(toLazyShuffledStream())
.forEach(System.out::println);
И результат:
3
4
8
1
7
6
5
0
2
9
Вопросы производительности
В этой реализации мы заменили N свопов элементов массива M поисками/удалениями, где:
N — размер коллекции
M — количество выбранных предметов
Как правило, один поиск/удаление из ArrayList является более дорогостоящей операцией, чем замена одного элемента, что делает это решение не таким масштабируемым, но значительно более производительным при относительно низких значениях M.
Давайте теперь посмотрим, как это решение соотносится с нетерпеливым подходом, продемонстрированным в начале (оба рассчитаны для коллекции, содержащей 100_000 объектов):
(limit) Mode Cnt Score Error Units
eager 1 thrpt 5 467.796 ± 9.074 ops/s
eager 10 thrpt 5 467.694 ± 17.166 ops/s
eager 100 thrpt 5 459.765 ± 8.048 ops/s
eager 1000 thrpt 5 467.934 ± 43.095 ops/s
eager 10000 thrpt 5 449.471 ± 5.549 ops/s
eager 100000 thrpt 5 331.111 ± 5.626 ops/s
lazy 1 thrpt 5 1530.763 ± 72.096 ops/s
lazy 10 thrpt 5 1462.305 ± 23.860 ops/s
lazy 100 thrpt 5 823.212 ± 119.771 ops/s
lazy 1000 thrpt 5 166.786 ± 16.306 ops/s
lazy 10000 thrpt 5 19.475 ± 4.052 ops/s
lazy 100000 thrpt 5 4.097 ± 0.416 ops/s
Как мы видим, это решение превосходит предыдущее, если количество обработанных элементов Stream относительно невелико, но по мере увеличения отношения обработано/размер_коллекции пропускная способность резко падает.
Это все из-за дополнительных накладных расходов, возникающих при удалении элементов из ArrayList, содержащих оставшиеся объекты — каждое удаление требует сдвига внутреннего массива на единицу с использованием относительно дорогого метода System#arraycopy.
Мы можем заметить аналогичную закономерность для гораздо больших коллекций (1_000_000 элементов):
(limit) (size) Mode Cnt Score Err Units
eager 1 10000000 thrpt 5 0.915 ops/s
eager 10 10000000 thrpt 5 0.783 ops/s
eager 100 10000000 thrpt 5 0.965 ops/s
eager 1000 10000000 thrpt 5 0.936 ops/s
eager 10000 10000000 thrpt 5 0.860 ops/s
lazy 1 10000000 thrpt 5 4.338 ops/s
lazy 10 10000000 thrpt 5 3.149 ops/s
lazy 100 10000000 thrpt 5 2.060 ops/s
lazy 1000 10000000 thrpt 5 0.370 ops/s
lazy 10000 10000000 thrpt 5 0.05 ops/s
…и намного меньше (128 элементов, помните о масштабе!):
(limit) (size) Mode Cnt Score Error Units
eager 2 128 thrpt 5 246439.459 ops/s
eager 4 128 thrpt 5 333866.936 ops/s
eager 8 128 thrpt 5 340296.188 ops/s
eager 16 128 thrpt 5 345533.673 ops/s
eager 32 128 thrpt 5 231725.156 ops/s
eager 64 128 thrpt 5 314324.265 ops/s
eager 128 128 thrpt 5 270451.992 ops/s
lazy 2 128 thrpt 5 765989.718 ops/s
lazy 4 128 thrpt 5 659421.041 ops/s
lazy 8 128 thrpt 5 652685.515 ops/s
lazy 16 128 thrpt 5 470346.570 ops/s
lazy 32 128 thrpt 5 324174.691 ops/s
lazy 64 128 thrpt 5 186472.090 ops/s
lazy 128 128 thrpt 5 108105.699 ops/s
Но можем ли мы сделать лучше, чем это?
Дальнейшие улучшения производительности
К сожалению, масштабируемость существующего решения весьма разочаровывает. Давайте попробуем улучшить его, но прежде чем мы это сделаем, мы должны сначала измерить:
Как и ожидалось, Arraylist#remove оказывается одной из горячих точек — другими словами, ЦП тратит заметное количество времени на удаление элементов из ArrayList.
Почему это? Удаление из ArrayList включает удаление элемента из базового массива. Загвоздка в том, что массивы в Java не могут быть изменены — каждое удаление вызывает создание нового массива меньшего размера:
private void fastRemove(Object[] es, int i) {
modCount++;
final int newSize;
if ((newSize = size - 1) > i)
System.arraycopy(es, i + 1, es, i, newSize - i);
es[size = newSize] = null;
}
Что мы можем с этим сделать? Избегайте удаления элементов из ArrayList.
Для этого мы могли бы хранить оставшиеся элементы в массиве и отслеживать его размер отдельно:
public class ImprovedRandomSpliterator<T> implements Spliterator<T> {
private final Random random;
private final T[] source;
private int size;
private ImprovedRandomSpliterator(
List<T> source, Supplier<? extends Random> random) {
if (source.isEmpty()) {
throw new IllegalArgumentException(...);
}
this.source = (T[]) source.toArray();
this.random = random.get();
this.size = this.source.length;
}
}
К счастью, мы можем избежать проблем с параллелизмом, поскольку экземпляры этого Spliterator не должны использоваться совместно между потоками.
И теперь всякий раз, когда мы пытаемся удалить элемент, нам не нужно фактически создавать новый уменьшенный массив. Вместо этого мы можем уменьшить размер нашего трекера и проигнорировать оставшуюся часть массива.
Но прямо перед этим нам нужно поменять местами последний элемент с возвращаемым элементом:
@Override
public boolean tryAdvance(Consumer<? super T> action) {
int nextIdx = random.nextInt(size);
int lastIdx = size - 1;
action.accept(source[nextIdx]);
source[nextIdx] = source[lastIdx];
source[lastIdx] = null; // let object be GCed
return --size > 0;
}
Если мы профилируем его сейчас, мы увидим, что дорогостоящий вызов исчез:
Мы готовы повторно запустить тесты и сравнить:
(limit) (size) Mode Cnt Score Error Units
eager 1 100000 thrpt 3 456.811 ± 20.585 ops/s
eager 10 100000 thrpt 3 469.635 ± 23.281 ops/s
eager 100 100000 thrpt 3 466.486 ± 68.820 ops/s
eager 1000 100000 thrpt 3 454.459 ± 13.103 ops/s
eager 10000 100000 thrpt 3 443.640 ± 96.929 ops/s
eager 100000 100000 thrpt 3 335.134 ± 21.944 ops/s
lazy 1 100000 thrpt 3 1587.536 ± 389.128 ops/s
lazy 10 100000 thrpt 3 1452.855 ± 406.879 ops/s
lazy 100 100000 thrpt 3 814.978 ± 242.077 ops/s
lazy 1000 100000 thrpt 3 167.825 ± 129.559 ops/s
lazy 10000 100000 thrpt 3 19.782 ± 8.596 ops/s
lazy 100000 100000 thrpt 3 3.970 ± 0.408 ops/s
lazy_improved 1 100000 thrpt 3 1509.264 ± 170.423 ops/s
lazy_improved 10 100000 thrpt 3 1512.150 ± 143.927 ops/s
lazy_improved 100 100000 thrpt 3 1463.093 ± 593.370 ops/s
lazy_improved 1000 100000 thrpt 3 1451.007 ± 58.948 ops/s
lazy_improved 10000 100000 thrpt 3 1148.581 ± 232.218 ops/s
lazy_improved 100000 100000 thrpt 3 383.022 ± 97.082 ops/s
Как видите, мы получили реализацию, которая более устойчива с точки зрения производительности к количеству элементов, к которым мы стремимся.
На самом деле улучшенная реализация работает немного лучше, чем реализация на основе Collections#shuffle даже в пессимистическом сценарии!
Полный пример
…может быть и найдено на GitHub.
package com.pivovarit.stream;
import java.util.List;
import java.util.Random;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class ImprovedRandomSpliterator<T> implements Spliterator<T> {
private final Random random;
private final T[] source;
private int size;
ImprovedRandomSpliterator(List<T> source, Supplier<? extends Random> random) {
if (source.isEmpty()) {
throw new IllegalArgumentException("RandomSpliterator can't be initialized with an empty collection");
}
this.source = (T[]) source.toArray();
this.random = random.get();
this.size = this.source.length;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
int nextIdx = random.nextInt(size);
int lastIdx = size - 1;
action.accept(source[nextIdx]);
source[nextIdx] = source[lastIdx];
source[lastIdx] = null; // let object be GCed
return --size > 0;
}
@Override
public Spliterator<T> trySplit() {
return null;
}
@Override
public long estimateSize() {
return source.length;
}
@Override
public int characteristics() {
return SIZED;
}
}
package com.pivovarit.stream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static java.util.stream.Collectors.toCollection;
public final class RandomCollectors {
private RandomCollectors() {
}
public static <T> Collector<T, ?, Stream<T>> toImprovedLazyShuffledStream() {
return Collectors.collectingAndThen(
toCollection(ArrayList::new),
list -> !list.isEmpty()
? StreamSupport.stream(new ImprovedRandomSpliterator<>(list, Random::new), false)
: Stream.empty());
}
public static <T> Collector<T, ?, Stream<T>> toLazyShuffledStream() {
return Collectors.collectingAndThen(
toCollection(ArrayList::new),
list -> !list.isEmpty()
? StreamSupport.stream(new RandomSpliterator<>(list, Random::new), false)
: Stream.empty());
}
public static <T> Collector<T, ?, Stream<T>> toEagerShuffledStream() {
return Collectors.collectingAndThen(
toCollection(ArrayList::new),
list -> {
Collections.shuffle(list);
return list.stream();
});
}
}