Skip to content

Commit 1835c37

Browse files
authored
Use a calling thread when using blocking streaming instead of creating a new one (#515)
1 parent c365e96 commit 1835c37

File tree

3 files changed

+72
-29
lines changed

3 files changed

+72
-29
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
<groupId>com.pivovarit</groupId>
2121
<artifactId>parallel-collectors</artifactId>
22-
<version>2.3.4-SNAPSHOT</version>
22+
<version>2.4.0-SNAPSHOT</version>
2323

2424
<packaging>jar</packaging>
2525

src/main/java/com/pivovarit/collectors/ParallelStreamCollector.java

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.Set;
77
import java.util.concurrent.CompletableFuture;
88
import java.util.concurrent.Executor;
9+
import java.util.concurrent.Semaphore;
910
import java.util.function.BiConsumer;
1011
import java.util.function.BinaryOperator;
1112
import java.util.function.Function;
@@ -31,26 +32,22 @@ class ParallelStreamCollector<T, R> implements Collector<T, Stream.Builder<Compl
3132

3233
private static final EnumSet<Characteristics> UNORDERED = EnumSet.of(Characteristics.UNORDERED);
3334

34-
private final Dispatcher<R> dispatcher;
3535
private final Function<T, R> function;
3636
private final CompletionStrategy<R> completionStrategy;
3737
private final Set<Characteristics> characteristics;
38+
private final Semaphore limiter;
39+
private final Executor executor;
3840

3941
private ParallelStreamCollector(
4042
Function<T, R> function,
4143
CompletionStrategy<R> completionStrategy,
4244
Set<Characteristics> characteristics,
43-
Dispatcher<R> dispatcher) {
45+
Executor executor, int parallelism) {
4446
this.completionStrategy = completionStrategy;
4547
this.characteristics = characteristics;
46-
this.dispatcher = dispatcher;
48+
this.limiter = new Semaphore(parallelism);
4749
this.function = function;
48-
}
49-
50-
private void startConsuming() {
51-
if (!dispatcher.isRunning()) {
52-
dispatcher.start();
53-
}
50+
this.executor = executor;
5451
}
5552

5653
@Override
@@ -61,8 +58,19 @@ public Supplier<Stream.Builder<CompletableFuture<R>>> supplier() {
6158
@Override
6259
public BiConsumer<Stream.Builder<CompletableFuture<R>>, T> accumulator() {
6360
return (acc, e) -> {
64-
startConsuming();
65-
acc.add(dispatcher.enqueue(() -> function.apply(e)));
61+
try {
62+
limiter.acquire();
63+
acc.add(CompletableFuture.supplyAsync(() -> {
64+
try {
65+
return function.apply(e);
66+
} finally {
67+
limiter.release();
68+
}
69+
}, executor));
70+
} catch (InterruptedException interruptedException) {
71+
Thread.currentThread().interrupt();
72+
throw new RuntimeException(interruptedException);
73+
}
6674
};
6775
}
6876

@@ -75,10 +83,7 @@ public BinaryOperator<Stream.Builder<CompletableFuture<R>>> combiner() {
7583

7684
@Override
7785
public Function<Stream.Builder<CompletableFuture<R>>, Stream<R>> finisher() {
78-
return acc -> {
79-
dispatcher.stop();
80-
return completionStrategy.apply(acc.build());
81-
};
86+
return acc -> completionStrategy.apply(acc.build());
8287
}
8388

8489
@Override
@@ -97,7 +102,7 @@ public Set<Characteristics> characteristics() {
97102

98103
return parallelism == 1
99104
? BatchingCollectors.syncCollector(mapper)
100-
: new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.of(executor, parallelism));
105+
: new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, executor, parallelism);
101106
}
102107

103108
static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor) {
@@ -111,7 +116,7 @@ public Set<Characteristics> characteristics() {
111116

112117
return parallelism == 1
113118
? BatchingCollectors.syncCollector(mapper)
114-
: new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.of(executor, parallelism));
119+
: new ParallelStreamCollector<>(mapper, ordered(), emptySet(), executor, parallelism);
115120
}
116121

117122
static final class BatchingCollectors {
@@ -125,7 +130,7 @@ private BatchingCollectors() {
125130

126131
return parallelism == 1
127132
? syncCollector(mapper)
128-
: batched(new ParallelStreamCollector<>(batching(mapper), unordered(), UNORDERED, Dispatcher.of(executor, parallelism)), parallelism);
133+
: batched(new ParallelStreamCollector<>(batching(mapper), unordered(), UNORDERED, executor, parallelism), parallelism);
129134
}
130135

131136
static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor, int parallelism) {
@@ -135,7 +140,7 @@ private BatchingCollectors() {
135140

136141
return parallelism == 1
137142
? syncCollector(mapper)
138-
: batched(new ParallelStreamCollector<>(batching(mapper), ordered(), emptySet(), Dispatcher.of(executor, parallelism)), parallelism);
143+
: batched(new ParallelStreamCollector<>(batching(mapper), ordered(), emptySet(), executor, parallelism), parallelism);
139144
}
140145

141146
private static <T, R> Collector<T, ?, Stream<R>> batched(ParallelStreamCollector<List<T>, List<R>> downstream, int parallelism) {

src/test/java/com/pivovarit/collectors/blackbox/FunctionalTest.java

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,7 @@ Stream<DynamicTest> collectors() {
6868
tests((m, e, p) -> parallel(m, toList(), e, p), format("ParallelCollectors.parallel(toList(), p=%d)", PARALLELISM), true),
6969
tests((m, e, p) -> parallel(m, toSet(), e, p), format("ParallelCollectors.parallel(toSet(), p=%d)", PARALLELISM), false),
7070
tests((m, e, p) -> parallel(m, toCollection(LinkedList::new), e, p), format("ParallelCollectors.parallel(toCollection(), p=%d)", PARALLELISM), true),
71-
tests((m, e, p) -> adapt(parallel(m, e, p)), format("ParallelCollectors.parallel(p=%d)", PARALLELISM), true),
72-
tests((m, e, p) -> adaptAsync(parallelToStream(m, e, p)), format("ParallelCollectors.parallelToStream(p=%d)", PARALLELISM), false),
73-
tests((m, e, p) -> adaptAsync(parallelToOrderedStream(m, e, p)), format("ParallelCollectors.parallelToOrderedStream(p=%d)", PARALLELISM), true)
71+
tests((m, e, p) -> adapt(parallel(m, e, p)), format("ParallelCollectors.parallel(p=%d)", PARALLELISM), true)
7472
).flatMap(identity());
7573
}
7674

@@ -80,9 +78,23 @@ Stream<DynamicTest> batching_collectors() {
8078
batchTests((m, e, p) -> Batching.parallel(m, toList(), e, p), format("ParallelCollectors.Batching.parallel(toList(), p=%d)", PARALLELISM), true),
8179
batchTests((m, e, p) -> Batching.parallel(m, toSet(), e, p), format("ParallelCollectors.Batching.parallel(toSet(), p=%d)", PARALLELISM), false),
8280
batchTests((m, e, p) -> Batching.parallel(m, toCollection(LinkedList::new), e, p), format("ParallelCollectors.Batching.parallel(toCollection(), p=%d)", PARALLELISM), true),
83-
batchTests((m, e, p) -> adapt(Batching.parallel(m, e, p)), format("ParallelCollectors.Batching.parallel(p=%d)", PARALLELISM), true),
84-
batchTests((m, e, p) -> adaptAsync(Batching.parallelToStream(m, e, p)), format("ParallelCollectors.Batching.parallelToStream(p=%d)", PARALLELISM), false),
85-
batchTests((m, e, p) -> adaptAsync(Batching.parallelToOrderedStream(m, e, p)), format("ParallelCollectors.Batching.parallelToOrderedStream(p=%d)", PARALLELISM), true)
81+
batchTests((m, e, p) -> adapt(Batching.parallel(m, e, p)), format("ParallelCollectors.Batching.parallel(p=%d)", PARALLELISM), true)
82+
).flatMap(identity());
83+
}
84+
85+
@TestFactory
86+
Stream<DynamicTest> streaming_collectors() {
87+
return of(
88+
streamingTests((m, e, p) -> adaptAsync(parallelToStream(m, e, p)), format("ParallelCollectors.parallelToStream(p=%d)", PARALLELISM), false),
89+
streamingTests((m, e, p) -> adaptAsync(parallelToOrderedStream(m, e, p)), format("ParallelCollectors.parallelToOrderedStream(p=%d)", PARALLELISM), true)
90+
).flatMap(identity());
91+
}
92+
93+
@TestFactory
94+
Stream<DynamicTest> streaming_batching_collectors() {
95+
return of(
96+
batchStreamingTests((m, e, p) -> adaptAsync(Batching.parallelToStream(m, e, p)), format("ParallelCollectors.Batching.parallelToStream(p=%d)", PARALLELISM), false),
97+
batchStreamingTests((m, e, p) -> adaptAsync(Batching.parallelToOrderedStream(m, e, p)), format("ParallelCollectors.Batching.parallelToOrderedStream(p=%d)", PARALLELISM), true)
8698
).flatMap(identity());
8799
}
88100

@@ -139,12 +151,34 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> tests(Collect
139151
);
140152
}
141153

154+
private static <R extends Collection<Integer>> Stream<DynamicTest> streamingTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean maintainsOrder) {
155+
return of(
156+
shouldCollect(collector, name, 1),
157+
shouldCollect(collector, name, PARALLELISM),
158+
shouldCollectToEmpty(collector, name),
159+
shouldStartConsumingImmediately(collector, name),
160+
shouldNotBlockTheCallingThread(collector, name),
161+
shouldMaintainOrder(collector, name, maintainsOrder),
162+
shouldRespectParallelism(collector, name),
163+
shouldHandleThrowable(collector, name),
164+
shouldShortCircuitOnException(collector, name),
165+
shouldHandleRejectedExecutionException(collector, name),
166+
shouldRemainConsistent(collector, name)
167+
);
168+
}
169+
142170
private static <R extends Collection<Integer>> Stream<DynamicTest> batchTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean maintainsOrder) {
143171
return Stream.concat(
144172
tests(collector, name, maintainsOrder),
145173
of(shouldProcessOnNThreadsETParallelism(collector, name)));
146174
}
147175

176+
private static <R extends Collection<Integer>> Stream<DynamicTest> batchStreamingTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean maintainsOrder) {
177+
return Stream.concat(
178+
streamingTests(collector, name, maintainsOrder),
179+
of(shouldProcessOnNThreadsETParallelism(collector, name)));
180+
}
181+
148182
private static <R extends Collection<Integer>> DynamicTest shouldNotBlockTheCallingThread(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> c, String name) {
149183
return dynamicTest(format("%s: should not block when returning future", name), () -> {
150184
assertTimeoutPreemptively(ofMillis(100), () ->
@@ -258,15 +292,19 @@ private static <R extends Collection<Integer>> DynamicTest shouldHandleThrowable
258292
}
259293

260294
private static <R extends Collection<Integer>> DynamicTest shouldHandleRejectedExecutionException(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
261-
return dynamicTest(format("%s: should survive rejected execution exception", name), () -> {
295+
return dynamicTest(format("%s: should propagate rejected execution exception", name), () -> {
262296
Executor executor = command -> { throw new RejectedExecutionException(); };
263297
List<Integer> elements = IntStream.range(0, 1000).boxed().collect(toList());
264298

265299
assertThatThrownBy(() -> elements.stream()
266300
.collect(collector.apply(i -> returnWithDelay(i, ofMillis(10000)), executor, PARALLELISM))
267301
.join())
268-
.isInstanceOf(CompletionException.class)
269-
.hasCauseExactlyInstanceOf(RejectedExecutionException.class);
302+
.isInstanceOfAny(RejectedExecutionException.class, CompletionException.class)
303+
.matches(ex -> {
304+
if (ex instanceof CompletionException) {
305+
return ex.getCause() instanceof RejectedExecutionException;
306+
} else return true;
307+
});
270308
});
271309
}
272310

0 commit comments

Comments
 (0)