Skip to content

Commit ad4efc4

Browse files
Egor Tsinkopivovarit
authored andcommitted
Not started Future is now cancellable (#2750)
1 parent 707bd08 commit ad4efc4

File tree

2 files changed

+42
-6
lines changed

2 files changed

+42
-6
lines changed

src/main/java/io/vavr/concurrent/FutureImpl.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.concurrent.*;
3535
import java.util.concurrent.locks.LockSupport;
3636
import java.util.function.Consumer;
37+
import java.util.function.Supplier;
3738

3839
/**
3940
* <strong>INTERNAL API - This class is subject to change.</strong>
@@ -100,7 +101,12 @@ private FutureImpl(Executor executor, Option<Try<T>> value, Queue<Consumer<Try<T
100101
this.actions = actions;
101102
this.waiters = waiters;
102103
try {
103-
computation.execute(this::tryComplete, this::updateThread);
104+
computation.execute(this::tryComplete, this::updateThread, () -> {
105+
// Synchronize as the future could be in the process of cancelling
106+
synchronized (lock) {
107+
return isCompleted();
108+
}
109+
});
104110
} catch (Throwable x) {
105111
tryComplete(Try.failure(x));
106112
}
@@ -115,7 +121,7 @@ private FutureImpl(Executor executor, Option<Try<T>> value, Queue<Consumer<Try<T
115121
* @return a new {@code FutureImpl} instance
116122
*/
117123
static <T> FutureImpl<T> of(Executor executor) {
118-
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread) -> {});
124+
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread, isCompleted) -> {});
119125
}
120126

121127
/**
@@ -127,7 +133,7 @@ static <T> FutureImpl<T> of(Executor executor) {
127133
* @return a new {@code FutureImpl} instance
128134
*/
129135
static <T> FutureImpl<T> of(Executor executor, Try<? extends T> value) {
130-
return new FutureImpl<>(executor, Option.some(Try.narrow(value)), null, null, (complete, updateThread) -> {});
136+
return new FutureImpl<>(executor, Option.some(Try.narrow(value)), null, null, (complete, updateThread, isCompleted) -> {});
131137
}
132138

133139
/**
@@ -140,7 +146,7 @@ static <T> FutureImpl<T> of(Executor executor, Try<? extends T> value) {
140146
* @return a new {@code FutureImpl} instance
141147
*/
142148
static <T> FutureImpl<T> sync(Executor executor, Task<? extends T> task) {
143-
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread) ->
149+
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread, isCompleted) ->
144150
task.run(complete::with)
145151
);
146152
}
@@ -156,8 +162,12 @@ static <T> FutureImpl<T> sync(Executor executor, Task<? extends T> task) {
156162
*/
157163
static <T> FutureImpl<T> async(Executor executor, Task<? extends T> task) {
158164
// In a single-threaded context this Future may already have been completed during initialization.
159-
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread) ->
165+
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread, isCompleted) ->
160166
executor.execute(() -> {
167+
// Avoid performing work, if future is already complete (normally by cancellation)
168+
if (isCompleted.get()) {
169+
return;
170+
}
161171
updateThread.run();
162172
try {
163173
task.run(complete::with);
@@ -414,6 +424,6 @@ private void handleUncaughtException(Throwable x) {
414424
}
415425

416426
private interface Computation<T> {
417-
void execute(Task.Complete<T> complete, Runnable updateThread) throws Throwable;
427+
void execute(Task.Complete<T> complete, Runnable updateThread, Supplier<Boolean> isCompleted) throws Throwable;
418428
}
419429
}

src/test/java/io/vavr/concurrent/FutureTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import static io.vavr.concurrent.Concurrent.waitUntil;
5555
import static java.util.concurrent.TimeUnit.MILLISECONDS;
5656
import static io.vavr.concurrent.Concurrent.zZz;
57+
import static java.util.concurrent.TimeUnit.SECONDS;
5758
import static org.assertj.core.api.Assertions.fail;
5859
import static org.junit.jupiter.api.Assertions.assertThrows;
5960

@@ -146,6 +147,31 @@ protected int getPeekNonNilPerformingAnAction() {
146147

147148
// -- static failed()
148149

150+
151+
@Test
152+
public void shouldNotExecuteFutureThatHasBeenCancelledBeforeItStarted() throws InterruptedException {
153+
ExecutorService es = Executors.newSingleThreadExecutor();
154+
155+
AtomicBoolean future2Executed = new AtomicBoolean(false);
156+
157+
// Submit f1 to the executor first
158+
Future<Void> f = Future.run(es, () -> Thread.sleep(1000));
159+
// Submit f2 next, it will have to wait to be executed
160+
Future<Void> f2 = Future.run(es, () -> {
161+
// Should never run this
162+
future2Executed.set(true);
163+
});
164+
165+
// Cancel f2 BEFORE it runs on the executor
166+
f2.cancel(true);
167+
f.cancel(true);
168+
es.shutdown();
169+
boolean terminated = es.awaitTermination(2, SECONDS);
170+
assertThat(terminated).isTrue();
171+
// f2 should never have run
172+
assertThat(future2Executed.get()).isFalse();
173+
}
174+
149175
@Test
150176
public void shouldCreateFailureThatFailsWithRuntimeException() {
151177
final Future<Object> failed = Future.failed(new RuntimeException("ooops")).await();

0 commit comments

Comments
 (0)