Skip to content

Commit 5e9d384

Browse files
committed
Optimise transition in thenCompose* (avoid unnecessary additional usage of Executor while we can run on same thread after tempStage)
1 parent 5a33cc3 commit 5e9d384

File tree

1 file changed

+22
-17
lines changed

1 file changed

+22
-17
lines changed

src/main/java/net/tascalate/concurrent/AbstractCompletableTask.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -236,13 +236,9 @@ public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionS
236236
// of nextStage.task when this nextStage is
237237
// exposed to the client, even in a "trivial" case:
238238
// Success path, just return value
239+
Consumer<? super U> onResult = nextStage.runTransition(Function.identity());
239240
// Failure path, just re-throw exception
240-
BiConsumer<? super U, ? super Throwable> moveToNextStage = (r, e) -> {
241-
if (null == e)
242-
runDirectly(nextStage, Function.identity(), r, executor);
243-
else
244-
runDirectly(nextStage, AbstractCompletableTask::forwardException, e, executor);
245-
};
241+
Consumer<? super Throwable> onError = nextStage.runTransition(AbstractCompletableTask::forwardException);
246242

247243
// Important -- tempStage is the target here
248244
addCallbacks(
@@ -276,23 +272,29 @@ public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionS
276272
if (nextStage.isCancelled()) {
277273
nextStage.cancelOrigins(true);
278274
} else {
279-
returned.whenComplete(moveToNextStage);
275+
// Synchronous, while transition to tempStage is asynchronous already
276+
returned.whenComplete(biConsumer(onResult, onError));
280277
}
281278
} catch (Throwable ex) {
282279
// must-have if fn.apply above failed
283280
nextStage.resetCancellableOrigins((CompletionStage<U>)null);
284281
// no need to check nextStage.isCancelled()
285282
// while there are no origins to cancel
286-
// propagate error immediately
287-
moveToNextStage.accept(null, ex);
283+
// propagate error immediately
284+
// synchronous, while transition to tempStage is asynchronous already
285+
onError.accept(ex);
288286
}
289287
}),
290-
consumerAsFunction(e -> moveToNextStage.accept(null, e)),
288+
consumerAsFunction(onError),
291289
executor
292290
);
293291

294292
return nextStage;
295293
}
294+
295+
private <U> Consumer<? super U> runTransition(Function<? super U, ? extends T> converter) {
296+
return u -> setupTransition(() -> converter.apply(u)).run();
297+
}
296298

297299
@Override
298300
public Promise<T> exceptionally(Function<Throwable, ? extends T> fn) {
@@ -442,6 +444,16 @@ private static <R> Function<R, Void> runnableAsFunction(Runnable action) {
442444
return null;
443445
};
444446
}
447+
448+
private static <U, V> BiConsumer<U, V> biConsumer(Consumer<? super U> onResult, Consumer<? super V> onError) {
449+
return (u, v) -> {
450+
if (null == v) {
451+
onResult.accept(u);
452+
} else {
453+
onError.accept(v);
454+
}
455+
};
456+
}
445457

446458
private static <U> U forwardException(Throwable e) {
447459
throw wrapCompletionException(e);
@@ -474,11 +486,4 @@ private <U> void addCallbacks(Function<? super Callable<U>, ? extends Runnable>
474486
callbackRegistry.addCallbacks(targetSetup, successCallback, failureCallback, executor);
475487
}
476488

477-
private static <S, U> void runDirectly(AbstractCompletableTask<U> targetStage,
478-
Function<? super S, ? extends U> callback,
479-
S value,
480-
Executor executor) {
481-
482-
CallbackRegistry.callCallback(targetStage::setupTransition, callback, value, executor);
483-
}
484489
}

0 commit comments

Comments
 (0)