Skip to content

Commit cda3354

Browse files
Merge pull request #13 from ruslansennov/iss12
follow parent project
2 parents 0078abc + 9063d3a commit cda3354

File tree

1 file changed

+27
-40
lines changed

1 file changed

+27
-40
lines changed

src/main/resources/super/io/vavr/concurrent/FutureImpl.java

Lines changed: 27 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ final class FutureImpl<T> implements Future<T> {
3838
/**
3939
* Used to start new threads.
4040
*/
41-
private final ExecutorService executorService;
41+
private final Executor executor;
4242

4343
/**
4444
* Used to synchronize state changes.
@@ -64,24 +64,15 @@ final class FutureImpl<T> implements Future<T> {
6464
@GuardedBy("lock")
6565
private Queue<Consumer<Try<T>>> actions;
6666

67-
/**
68-
* Once a computation is started via run(), job is defined and used to control the lifecycle of the computation.
69-
* <p>
70-
* The {@code java.util.concurrent.Future} is not intended to store the result of the computation, it is stored in
71-
* {@code value} instead.
72-
*/
73-
@GuardedBy("lock")
74-
private java.util.concurrent.Future<?> job;
75-
7667
// single constructor
77-
private FutureImpl(ExecutorService executorService, Option<Try<T>> value, Queue<Consumer<Try<T>>> actions, CheckedFunction1<FutureImpl<T>, java.util.concurrent.Future<?>> jobFactory) {
78-
this.executorService = executorService;
68+
private FutureImpl(Executor executor, Option<Try<T>> value, Queue<Consumer<Try<T>>> actions, Computation<T> computation) {
69+
this.executor = executor;
7970
synchronized (lock) {
8071
this.cancelled = false;
8172
this.value = value;
8273
this.actions = actions;
8374
try {
84-
this.job = jobFactory.apply(this);
75+
computation.execute(this::tryComplete, this::updateThread);
8576
} catch(Throwable x) {
8677
tryComplete(Try.failure(x));
8778
}
@@ -91,46 +82,44 @@ private FutureImpl(ExecutorService executorService, Option<Try<T>> value, Queue<
9182
/**
9283
* Creates a {@code FutureImpl} that is immediately completed with the given value. No task will be started.
9384
*
94-
* @param executorService An {@link ExecutorService} to run and control the computation and to perform the actions.
85+
* @param executor An {@link Executor} to run and control the computation and to perform the actions.
9586
* @param value the result of this Future
9687
*/
9788
@SuppressWarnings("unchecked")
98-
static <T> FutureImpl<T> of(ExecutorService executorService, Try<? extends T> value) {
99-
return new FutureImpl<>(executorService, Option.some(Try.narrow(value)), null, ignored -> null);
89+
static <T> FutureImpl<T> of(Executor executor, Try<? extends T> value) {
90+
return new FutureImpl<>(executor, Option.some(Try.narrow(value)), null, (tryComplete, updateThread) -> {});
10091
}
10192

10293
/**
10394
* Creates a {@code FutureImpl} that is eventually completed.
10495
* The given {@code computation} is <em>synchronously</em> executed, no thread is started.
10596
*
106-
* @param executorService An {@link ExecutorService} to run and control the computation and to perform the actions.
97+
* @param executor An {@link Executor} to run and control the computation and to perform the actions.
10798
* @param computation A non-blocking computation
10899
* @param <T> value type of the Future
109100
* @return a new {@code FutureImpl} instance
110101
*/
111-
static <T> FutureImpl<T> sync(ExecutorService executorService, CheckedConsumer<Predicate<Try<? extends T>>> computation) {
112-
return new FutureImpl<>(executorService, Option.none(), Queue.empty(), future -> {
113-
computation.accept(future::tryComplete);
114-
return null;
102+
static <T> FutureImpl<T> sync(Executor executor, CheckedConsumer<Predicate<Try<? extends T>>> computation) {
103+
return new FutureImpl<>(executor, Option.none(), Queue.empty(), (tryComplete, updateThread) -> {
104+
computation.accept(tryComplete);
115105
});
116106
}
117107

118108
/**
119109
* Creates a {@code FutureImpl} that is eventually completed.
120110
* The given {@code computation} is <em>asynchronously</em> executed, a new thread is started.
121111
*
122-
* @param executorService An {@link ExecutorService} to run and control the computation and to perform the actions.
112+
* @param executor An {@link Executor} to run and control the computation and to perform the actions.
123113
* @param computation A (possibly blocking) computation
124114
* @param <T> value type of the Future
125115
* @return a new {@code FutureImpl} instance
126116
*/
127-
static <T> FutureImpl<T> async(ExecutorService executorService, CheckedConsumer<Predicate<Try<? extends T>>> computation) {
128-
// In a single-threaded context this Future may already have been completed during initialization.
129-
return new FutureImpl<>(executorService, Option.none(), Queue.empty(), future -> executorService.submit(() -> {
117+
static <T> FutureImpl<T> async(Executor executor, CheckedConsumer<Predicate<Try<? extends T>>> computation) {
118+
return new FutureImpl<>(executor, Option.none(), Queue.empty(), (tryComplete, updateThread) -> executor.execute(() -> {
130119
try {
131-
computation.accept(future::tryComplete);
120+
computation.accept(tryComplete);
132121
} catch (Throwable x) {
133-
future.tryComplete(Try.failure(x));
122+
tryComplete.test(Try.failure(x));
134123
}
135124
}));
136125
}
@@ -148,22 +137,17 @@ public Future<T> await(long timeout, TimeUnit unit) {
148137
@Override
149138
public Future<T> cancel(boolean mayInterruptIfRunning) {
150139
if (!isCompleted()) {
151-
synchronized (lock) {
152-
Try.of(() -> job == null || job.cancel(mayInterruptIfRunning))
153-
.recover(ignored -> job != null && job.isCancelled())
154-
.onSuccess(cancelled -> {
155-
if (cancelled) {
156-
this.cancelled = tryComplete(Try.failure(new CancellationException()));
157-
}
158-
});
159-
}
140+
this.cancelled = tryComplete(Try.failure(new CancellationException()));
160141
}
161142
return this;
162143
}
163144

145+
private void updateThread() {
146+
}
147+
164148
@Override
165-
public ExecutorService executorService() {
166-
return executorService;
149+
public Executor executor() {
150+
return executor;
167151
}
168152

169153
@Override
@@ -235,7 +219,6 @@ private boolean tryComplete(Try<? extends T> value) {
235219
actions = this.actions;
236220
this.value = Option.some(Try.narrow(value));
237221
this.actions = null;
238-
this.job = null;
239222
}
240223
}
241224
if (actions != null) {
@@ -249,9 +232,13 @@ private boolean tryComplete(Try<? extends T> value) {
249232

250233
private void perform(Consumer<? super Try<T>> action) {
251234
try {
252-
executorService.execute(() -> action.accept(value.get()));
235+
executor.execute(() -> action.accept(value.get()));
253236
} catch(Throwable x) {
254237
// ignored // TODO: tell UncaughtExceptionHandler?
255238
}
256239
}
240+
241+
private interface Computation<T> {
242+
void execute(Predicate<Try<? extends T>> tryComplete, Runnable updateThread) throws Throwable;
243+
}
257244
}

0 commit comments

Comments
 (0)