Skip to content

Commit f70a56d

Browse files
committed
Fix onTimeout / orTimeout handling for single-threaded executors; simplify CallbackRegistry; fix toCompletableFuture completion in wrappers
1 parent 5f4ecab commit f70a56d

File tree

11 files changed

+173
-179
lines changed

11 files changed

+173
-179
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>net.tascalate</groupId>
88
<artifactId>net.tascalate.concurrent</artifactId>
9-
<version>0.9.3</version>
9+
<version>0.9.4-SNAPSHOT</version>
1010
<packaging>jar</packaging>
1111

1212
<name>Tascalate Concurrent</name>

src/main/java/net/tascalate/concurrent/AbstractCompletableTask.java

Lines changed: 13 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public String toString() {
144144
@Override
145145
public <U> Promise<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor) {
146146
return addCallbacks(
147-
internalCreateCompletionStage(executor),
147+
newSubTask(executor),
148148
r -> {
149149
try {
150150
return fn.apply(r);
@@ -161,7 +161,7 @@ public <U> Promise<U> thenApplyAsync(Function<? super T, ? extends U> fn, Execut
161161
public Promise<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor) {
162162
// Symmetrical with thenApplyAsync
163163
return addCallbacks(
164-
internalCreateCompletionStage(executor),
164+
newSubTask(executor),
165165
Function.identity(),
166166
failure -> {
167167
try {
@@ -180,7 +180,7 @@ public Promise<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Execut
180180
@Override
181181
public <U> Promise<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
182182
return addCallbacks(
183-
internalCreateCompletionStage(executor),
183+
newSubTask(executor),
184184
result -> {
185185
try {
186186
return fn.apply(result, null);
@@ -211,8 +211,8 @@ public <U> Promise<U> handleAsync(BiFunction<? super T, Throwable, ? extends U>
211211
@Override
212212
public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) {
213213

214-
AbstractCompletableTask<Void> tempStage = internalCreateCompletionStage(executor);
215-
AbstractCompletableTask<U> nextStage = internalCreateCompletionStage(executor);
214+
AbstractCompletableTask<Void> tempStage = newSubTask(executor);
215+
AbstractCompletableTask<U> nextStage = newSubTask(executor);
216216
// Need to enlist tempStage while it is non-visible outside
217217
// and may not be used to interrupt fn.apply();
218218
nextStage.intermediateStage = tempStage;
@@ -285,8 +285,8 @@ public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionS
285285
public Promise<T> exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn, Executor executor) {
286286
// Symmetrical with thenComposeAsync
287287
// See comments for thenComposeAsync -- all are valid here, this is just a different path in Either (left vs right)
288-
AbstractCompletableTask<Void> tempStage = internalCreateCompletionStage(executor);
289-
AbstractCompletableTask<T> nextStage = internalCreateCompletionStage(executor);
288+
AbstractCompletableTask<Void> tempStage = newSubTask(executor);
289+
AbstractCompletableTask<T> nextStage = newSubTask(executor);
290290

291291
nextStage.intermediateStage = tempStage;
292292

@@ -330,7 +330,7 @@ public Promise<T> exceptionallyComposeAsync(Function<Throwable, ? extends Comple
330330
@Override
331331
public Promise<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {
332332
return addCallbacks(
333-
internalCreateCompletionStage(executor),
333+
newSubTask(executor),
334334
result -> {
335335
try {
336336
action.accept(result, null);
@@ -363,26 +363,9 @@ public Promise<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> act
363363
@Override
364364
public CompletableFuture<T> toCompletableFuture() {
365365
CompletableFuture<T> result = new CompletableFuture<>();
366-
// nextStage is CompletableFuture rather than AbstractCompletableTask
367-
// so trigger completion on ad-hoc runnable rather than on
368-
// nextStage.task
369-
Consumer<Callable<T>> setup = c -> {
370-
try {
371-
c.call();
372-
} catch (Throwable ex) {
373-
result.completeExceptionally(ex);
374-
}
375-
};
376-
addCallbacks(
377-
setup,
378-
r -> op(result.complete(r)),
379-
e -> op(result.completeExceptionally(e)),
380-
SAME_THREAD_EXECUTOR
381-
);
366+
whenComplete((r, e) -> iif(e == null ? result.complete(r) : result.completeExceptionally(e)));
382367
return result;
383368
}
384-
385-
private static <T> T op(boolean v) { return null; }
386369

387370
/**
388371
* This method exists just to reconcile generics when called from
@@ -395,7 +378,7 @@ protected <R, U> Promise<U> doApplyToEitherAsync(CompletionStage<? extends R> fi
395378
Function<? super R, U> fn,
396379
Executor executor) {
397380

398-
AbstractCompletableTask<R> nextStage = internalCreateCompletionStage(executor);
381+
AbstractCompletableTask<R> nextStage = newSubTask(executor);
399382

400383
// Next stage is not exposed to the client, so we can
401384
// short-circuit its initiation - just fire callbacks
@@ -416,8 +399,8 @@ protected <R, U> Promise<U> doApplyToEitherAsync(CompletionStage<? extends R> fi
416399
}
417400

418401
abstract protected <U> AbstractCompletableTask<U> createCompletionStage(Executor executor);
419-
420-
private <U> AbstractCompletableTask<U> internalCreateCompletionStage(Executor executor) {
402+
403+
private <U> AbstractCompletableTask<U> newSubTask(Executor executor) {
421404
// Preserve default async executor, or use user-supplied executor as default
422405
// But don't let SAME_THREAD_EXECUTOR to be a default async executor
423406
return createCompletionStage(executor == SAME_THREAD_EXECUTOR ? getDefaultExecutor() : executor);
@@ -450,16 +433,8 @@ private <U> AbstractCompletableTask<U> addCallbacks(AbstractCompletableTask<U> t
450433
Function<Throwable, ? extends U> failureCallback,
451434
Executor executor) {
452435

453-
addCallbacks(targetStage::fireTransition, successCallback, failureCallback, executor);
436+
callbackRegistry.addCallbacks(targetStage, successCallback, failureCallback, executor);
454437
return targetStage;
455438
}
456439

457-
private <U> void addCallbacks(Consumer<? super Callable<U>> stageTransition,
458-
Function<? super T, ? extends U> successCallback,
459-
Function<Throwable, ? extends U> failureCallback,
460-
Executor executor) {
461-
462-
callbackRegistry.addCallbacks(stageTransition, successCallback, failureCallback, executor);
463-
}
464-
465440
}

src/main/java/net/tascalate/concurrent/CallbackRegistry.java

Lines changed: 56 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,24 @@
2222
*/
2323
package net.tascalate.concurrent;
2424

25+
import java.util.Collection;
2526
import java.util.LinkedList;
2627
import java.util.Objects;
27-
import java.util.Queue;
2828
import java.util.concurrent.Callable;
2929
import java.util.concurrent.CompletableFuture;
3030
import java.util.concurrent.Executor;
3131
import java.util.concurrent.RejectedExecutionException;
32-
import java.util.function.Consumer;
32+
import java.util.function.BiConsumer;
3333
import java.util.function.Function;
3434

3535
class CallbackRegistry<T> {
36-
private State<T> state = InitialState.instance();
37-
3836
private final Object mutex = new Object();
39-
37+
private State<T> state = InitialState.instance();
38+
4039
/**
4140
* Adds the given callbacks to this registry.
4241
*/
43-
<U> void addCallbacks(Consumer<? super Callable<U>> stageTransition,
42+
<U> void addCallbacks(AbstractCompletableTask<U> target,
4443
Function<? super T, ? extends U> successCallback,
4544
Function<Throwable, ? extends U> failureCallback,
4645
Executor executor) {
@@ -49,11 +48,8 @@ <U> void addCallbacks(Consumer<? super Callable<U>> stageTransition,
4948
Objects.requireNonNull(failureCallback, "'failureCallback' must not be null");
5049
Objects.requireNonNull(executor, "'executor' must not be null");
5150

52-
@SuppressWarnings("unchecked")
53-
Consumer<? super Callable<?>> typedTransition = (Consumer<? super Callable<?>>)stageTransition;
54-
5551
synchronized (mutex) {
56-
state = state.addCallbacks(typedTransition, successCallback, failureCallback, executor);
52+
state = state.addCallbacks(target, successCallback, failureCallback, executor);
5753
}
5854
}
5955

@@ -114,11 +110,11 @@ boolean isCompleted() {
114110
* State of the registry. All subclasses are meant to be used form a
115111
* synchronized block and are NOT thread safe on their own.
116112
*/
117-
private static abstract class State<S> {
118-
protected abstract State<S> addCallbacks(Consumer<? super Callable<?>> stageTransition,
119-
Function<? super S, ?> successCallback,
120-
Function<Throwable, ?> failureCallback,
121-
Executor executor);
113+
static abstract class State<S> {
114+
protected abstract <U> State<S> addCallbacks(AbstractCompletableTask<U> target,
115+
Function<? super S, ? extends U> successCallback,
116+
Function<Throwable, ? extends U> failureCallback,
117+
Executor executor);
122118

123119
protected State<S> getSuccessState(S result) {
124120
throw new IllegalStateException("success method should not be called multiple times");
@@ -147,17 +143,17 @@ protected boolean isFailure() {
147143
* Result is not known yet and no callbacks registered. Using shared
148144
* instance so we do not allocate instance where it may not be needed.
149145
*/
150-
private static class InitialState<S> extends State<S> {
146+
static class InitialState<S> extends State<S> {
151147
private static final InitialState<Object> instance = new InitialState<>();
152148

153149
@Override
154-
protected State<S> addCallbacks(Consumer<? super Callable<?>> stageTransition,
155-
Function<? super S, ?> successCallback,
156-
Function<Throwable, ?> failureCallback,
157-
Executor executor) {
150+
protected <U> State<S> addCallbacks(AbstractCompletableTask<U> target,
151+
Function<? super S, ? extends U> successCallback,
152+
Function<Throwable, ? extends U> failureCallback,
153+
Executor executor) {
158154

159155
IntermediateState<S> intermediateState = new IntermediateState<>();
160-
intermediateState.addCallbacks(stageTransition, successCallback, failureCallback, executor);
156+
intermediateState.addCallbacks(target, successCallback, failureCallback, executor);
161157
return intermediateState;
162158
}
163159

@@ -177,24 +173,30 @@ protected boolean isCompleted() {
177173
}
178174

179175
@SuppressWarnings("unchecked")
180-
private static <T> State<T> instance() {
176+
static <T> State<T> instance() {
181177
return (State<T>) instance;
182178
}
183179
}
184180

185181
/**
186182
* Result is not known yet.
187183
*/
188-
private static class IntermediateState<S> extends State<S> {
189-
private final Queue<CallbackHolder<? super S>> callbacks = new LinkedList<>();
184+
static class IntermediateState<S> extends State<S> {
185+
private final Collection<BiConsumer<? super S, ? super Throwable>> callbacks = new LinkedList<>();
190186

191187
@Override
192-
protected State<S> addCallbacks(Consumer<? super Callable<?>> stageTransition,
193-
Function<? super S, ?> successCallback,
194-
Function<Throwable, ?> failureCallback,
195-
Executor executor) {
188+
protected <U> State<S> addCallbacks(AbstractCompletableTask<U> target,
189+
Function<? super S, ? extends U> successCallback,
190+
Function<Throwable, ? extends U> failureCallback,
191+
Executor executor) {
196192

197-
callbacks.add(new CallbackHolder<>(stageTransition, successCallback, failureCallback, executor));
193+
callbacks.add((r, e) -> {
194+
if (null == e) {
195+
callCallback(target, successCallback, r, executor);
196+
} else {
197+
callCallback(target, failureCallback, e, executor);
198+
}
199+
});
198200
return this;
199201
}
200202

@@ -205,10 +207,8 @@ protected State<S> getSuccessState(S result) {
205207

206208
@Override
207209
protected void callSuccessCallbacks(S result) {
208-
// no need to remove callbacks from the queue, this instance will be
209-
// thrown away at once
210-
for (CallbackHolder<? super S> callback : callbacks) {
211-
callback.callSuccessCallback(result);
210+
for (BiConsumer<? super S, ? super Throwable> callback : callbacks) {
211+
callback.accept(result, null);
212212
}
213213
}
214214

@@ -219,10 +219,8 @@ protected State<S> getFailureState(Throwable failure) {
219219

220220
@Override
221221
protected void callFailureCallbacks(Throwable failure) {
222-
// no need to remove callbacks from the queue, this instance will be
223-
// thrown away at once
224-
for (CallbackHolder<? super S> callback : callbacks) {
225-
callback.callFailureCallback(failure);
222+
for (BiConsumer<? super S, ? super Throwable> callback : callbacks) {
223+
callback.accept(null, failure);
226224
}
227225
}
228226

@@ -235,41 +233,39 @@ protected boolean isCompleted() {
235233
/**
236234
* Holds the result.
237235
*/
238-
private static final class SuccessState<S> extends State<S> {
236+
static final class SuccessState<S> extends State<S> {
239237
private final S result;
240238

241-
private SuccessState(S result) {
239+
SuccessState(S result) {
242240
this.result = result;
243241
}
244242

245243
@Override
246-
protected State<S> addCallbacks(Consumer<? super Callable<?>> stageTransition,
247-
Function<? super S, ?> successCallback,
248-
Function<Throwable, ?> failureCallback,
249-
Executor executor) {
250-
251-
callCallback(stageTransition, successCallback, result, executor);
244+
protected <U> State<S> addCallbacks(AbstractCompletableTask<U> target,
245+
Function<? super S, ? extends U> successCallback,
246+
Function<Throwable, ? extends U> failureCallback,
247+
Executor executor) {
248+
callCallback(target, successCallback, result, executor);
252249
return this;
253250
}
254251
}
255252

256253
/**
257254
* Holds the failure.
258255
*/
259-
private static final class FailureState<S> extends State<S> {
256+
static final class FailureState<S> extends State<S> {
260257
private final Throwable failure;
261258

262-
private FailureState(Throwable failure) {
259+
FailureState(Throwable failure) {
263260
this.failure = failure;
264261
}
265262

266263
@Override
267-
protected State<S> addCallbacks(Consumer<? super Callable<?>> stageTransition,
268-
Function<? super S, ?> successCallback,
269-
Function<Throwable, ?> failureCallback,
270-
Executor executor) {
271-
272-
callCallback(stageTransition, failureCallback, failure, executor);
264+
protected <U> State<S> addCallbacks(AbstractCompletableTask<U> target,
265+
Function<? super S, ? extends U> successCallback,
266+
Function<Throwable, ? extends U> failureCallback,
267+
Executor executor) {
268+
callCallback(target, failureCallback, failure, executor);
273269
return this;
274270
}
275271

@@ -278,47 +274,21 @@ protected boolean isFailure() {
278274
}
279275
}
280276

281-
private static final class CallbackHolder<S> {
282-
private final Consumer<? super Callable<?>> stageTransition;
283-
private final Function<? super S, ?> successCallback;
284-
private final Function<Throwable, ?> failureCallback;
285-
private final Executor executor;
286-
287-
private CallbackHolder(Consumer<? super Callable<?>> stageTransition,
288-
Function<? super S, ?> successCallback,
289-
Function<Throwable, ?> failureCallback,
290-
Executor executor) {
291-
292-
this.stageTransition = stageTransition;
293-
this.successCallback = successCallback;
294-
this.failureCallback = failureCallback;
295-
this.executor = executor;
296-
}
297-
298-
void callSuccessCallback(S result) {
299-
callCallback(stageTransition, successCallback, result, executor);
300-
}
301-
302-
void callFailureCallback(Throwable failure) {
303-
callCallback(stageTransition, failureCallback, failure, executor);
304-
}
305-
}
306-
307-
private static <S, U> void callCallback(Consumer<? super Callable<?>> stageTransition,
308-
Function<? super S, ? extends U> callback,
309-
S value,
310-
Executor executor) {
277+
static <T, U> void callCallback(AbstractCompletableTask<U> target,
278+
Function<? super T, ? extends U> callback,
279+
T value,
280+
Executor executor) {
311281

312282
Callable<U> callable = () -> callback.apply(value);
313283
try {
314-
executor.execute( (AsyncTask)() -> stageTransition.accept(callable) );
284+
executor.execute( (AsyncTask)() -> target.fireTransition(callable) );
315285
} catch (RejectedExecutionException ex) {
316286
// Propagate error in-place
317287
Callable<U> propagateError = () -> { throw ex; };
318-
stageTransition.accept(propagateError);
288+
target.fireTransition(propagateError);
319289
}
320290
}
321-
291+
322292
@FunctionalInterface
323293
static interface AsyncTask extends Runnable, CompletableFuture.AsynchronousCompletionTask {}
324294

0 commit comments

Comments
 (0)