34
34
import java .util .concurrent .*;
35
35
import java .util .concurrent .locks .LockSupport ;
36
36
import java .util .function .Consumer ;
37
+ import java .util .function .Supplier ;
37
38
38
39
/**
39
40
* <strong>INTERNAL API - This class is subject to change.</strong>
@@ -100,7 +101,12 @@ private FutureImpl(Executor executor, Option<Try<T>> value, Queue<Consumer<Try<T
100
101
this .actions = actions ;
101
102
this .waiters = waiters ;
102
103
try {
103
- computation .execute (this ::tryComplete , this ::updateThread );
104
+ computation .execute (this ::tryComplete , this ::updateThread , () -> {
105
+ // Synchronize as the future could be in the process of cancelling
106
+ synchronized (lock ) {
107
+ return isCompleted ();
108
+ }
109
+ });
104
110
} catch (Throwable x ) {
105
111
tryComplete (Try .failure (x ));
106
112
}
@@ -115,7 +121,7 @@ private FutureImpl(Executor executor, Option<Try<T>> value, Queue<Consumer<Try<T
115
121
* @return a new {@code FutureImpl} instance
116
122
*/
117
123
static <T > FutureImpl <T > of (Executor executor ) {
118
- return new FutureImpl <>(executor , Option .none (), Queue .empty (), Queue .empty (), (complete , updateThread ) -> {});
124
+ return new FutureImpl <>(executor , Option .none (), Queue .empty (), Queue .empty (), (complete , updateThread , isCompleted ) -> {});
119
125
}
120
126
121
127
/**
@@ -127,7 +133,7 @@ static <T> FutureImpl<T> of(Executor executor) {
127
133
* @return a new {@code FutureImpl} instance
128
134
*/
129
135
static <T > FutureImpl <T > of (Executor executor , Try <? extends T > value ) {
130
- return new FutureImpl <>(executor , Option .some (Try .narrow (value )), null , null , (complete , updateThread ) -> {});
136
+ return new FutureImpl <>(executor , Option .some (Try .narrow (value )), null , null , (complete , updateThread , isCompleted ) -> {});
131
137
}
132
138
133
139
/**
@@ -140,7 +146,7 @@ static <T> FutureImpl<T> of(Executor executor, Try<? extends T> value) {
140
146
* @return a new {@code FutureImpl} instance
141
147
*/
142
148
static <T > FutureImpl <T > sync (Executor executor , Task <? extends T > task ) {
143
- return new FutureImpl <>(executor , Option .none (), Queue .empty (), Queue .empty (), (complete , updateThread ) ->
149
+ return new FutureImpl <>(executor , Option .none (), Queue .empty (), Queue .empty (), (complete , updateThread , isCompleted ) ->
144
150
task .run (complete ::with )
145
151
);
146
152
}
@@ -156,8 +162,12 @@ static <T> FutureImpl<T> sync(Executor executor, Task<? extends T> task) {
156
162
*/
157
163
static <T > FutureImpl <T > async (Executor executor , Task <? extends T > task ) {
158
164
// In a single-threaded context this Future may already have been completed during initialization.
159
- return new FutureImpl <>(executor , Option .none (), Queue .empty (), Queue .empty (), (complete , updateThread ) ->
165
+ return new FutureImpl <>(executor , Option .none (), Queue .empty (), Queue .empty (), (complete , updateThread , isCompleted ) ->
160
166
executor .execute (() -> {
167
+ // Avoid performing work, if future is already complete (normally by cancellation)
168
+ if (isCompleted .get ()) {
169
+ return ;
170
+ }
161
171
updateThread .run ();
162
172
try {
163
173
task .run (complete ::with );
@@ -414,6 +424,6 @@ private void handleUncaughtException(Throwable x) {
414
424
}
415
425
416
426
private interface Computation <T > {
417
- void execute (Task .Complete <T > complete , Runnable updateThread ) throws Throwable ;
427
+ void execute (Task .Complete <T > complete , Runnable updateThread , Supplier < Boolean > isCompleted ) throws Throwable ;
418
428
}
419
429
}
0 commit comments