Skip to content

Commit f18baf6

Browse files
committed
Better caching of custom cancel method handles (fix error with unloading)
1 parent 8680f26 commit f18baf6

File tree

5 files changed

+282
-94
lines changed

5 files changed

+282
-94
lines changed

.classpath

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
1717
<attributes>
1818
<attribute name="maven.pomderived" value="true"/>
19+
<attribute name="optional" value="true"/>
1920
</attributes>
2021
</classpathentry>
2122
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-12">
@@ -33,6 +34,7 @@
3334
<attributes>
3435
<attribute name="test" value="true"/>
3536
<attribute name="maven.pomderived" value="true"/>
37+
<attribute name="optional" value="true"/>
3638
</attributes>
3739
</classpathentry>
3840
<classpathentry kind="output" path="target/classes"/>

src/main/java/net/tascalate/concurrent/core/Cache.java

Lines changed: 0 additions & 74 deletions
This file was deleted.

src/main/java/net/tascalate/concurrent/core/CancelMethodsCache.java

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,27 +39,31 @@ public static interface Cancellation {
3939
private CancelMethodsCache() {}
4040

4141
public static Cancellation cancellationOf(Class<?> stageClass) {
42-
return CANCEL_METHODS.get(stageClass, LOOKUP_CANCEL_METHOD);
42+
return GET_CANCEL_METHOD_BY_CLASS.apply(stageClass);
4343
}
4444

45-
private static final Cache<Class<?>, Cancellation> CANCEL_METHODS = new Cache<>();
46-
private static final Cancellation NO_CANCELATION = (p, b) -> {
47-
System.err.println("Cancellation is not supported for promise " + p);
48-
return false;
49-
};
50-
private static final Function<Class<?>, Cancellation> LOOKUP_CANCEL_METHOD = c -> {
51-
Stream<Function<Class<?>, ExceptionalCancellation>> options = Stream.of(
52-
CancelMethodsCache::cancelInterruptibleMethodOf,
53-
CancelMethodsCache::cancelMethodOf,
54-
CancelMethodsCache::completeExceptionallyMethodOf
55-
);
56-
return options.map(option -> Optional.ofNullable( option.apply(c) ))
57-
.filter(Optional::isPresent)
58-
.map(Optional::get)
59-
.map(ExceptionalCancellation::unchecked)
60-
.findFirst()
61-
.orElse(NO_CANCELATION);
62-
};
45+
private static final MethodHandles.Lookup LOOKUP = MethodHandles.lookup();
46+
47+
private static final Function<Class<?>, Cancellation> GET_CANCEL_METHOD_BY_CLASS =
48+
new FunctionMemoization<>(c -> {
49+
50+
Cancellation NO_CANCELATION = (p, b) -> {
51+
System.err.println("Cancellation is not supported for promise " + p);
52+
return false;
53+
};
54+
55+
Stream<Function<Class<?>, ExceptionalCancellation>> options = Stream.of(
56+
CancelMethodsCache::cancelInterruptibleMethodOf,
57+
CancelMethodsCache::cancelMethodOf,
58+
CancelMethodsCache::completeExceptionallyMethodOf
59+
);
60+
return options.map(option -> Optional.ofNullable( option.apply(c) ))
61+
.filter(Optional::isPresent)
62+
.map(Optional::get)
63+
.map(ExceptionalCancellation::unchecked)
64+
.findFirst()
65+
.orElse(NO_CANCELATION);
66+
});
6367

6468
private static ExceptionalCancellation cancelInterruptibleMethodOf(Class<?> clazz) {
6569
try {
@@ -136,7 +140,7 @@ private static Method firstUnreflectableMethod(Class<?> clazz, Method m, Set<Cla
136140
}
137141

138142
private static MethodHandle unreflect(Method m) throws IllegalAccessException {
139-
return MethodHandles.publicLookup().unreflect(m);
143+
return LOOKUP.unreflect(m);
140144
}
141145

142146
@FunctionalInterface
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package net.tascalate.concurrent.core;
2+
3+
import java.lang.ref.Reference;
4+
import java.lang.ref.ReferenceQueue;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.ConcurrentMap;
7+
import java.util.function.Function;
8+
9+
class FunctionMemoization<K, V> implements Function<K, V> {
10+
private final ConcurrentMap<K, Object> producerMutexes = new ConcurrentHashMap<>();
11+
private final ConcurrentMap<Object, Object> valueMap = new ConcurrentHashMap<>();
12+
13+
private final Function<? super K, ? extends V> fn;
14+
private final ReferenceType keyRefType;
15+
private final ReferenceType valueRefType;
16+
private final ReferenceQueue<K> queue;
17+
18+
FunctionMemoization(Function<? super K, ? extends V> fn) {
19+
this(ReferenceType.WEAK, ReferenceType.SOFT, fn);
20+
}
21+
22+
FunctionMemoization(ReferenceType keyRefType, ReferenceType valueRefType, Function<? super K, ? extends V> fn) {
23+
this.fn = fn;
24+
this.keyRefType = keyRefType;
25+
this.valueRefType = valueRefType;
26+
this.queue = keyRefType.createKeyReferenceQueue();
27+
}
28+
29+
@Override
30+
public V apply(K key) {
31+
expungeStaleEntries();
32+
33+
Object lookupKeyRef = keyRefType.createLookupKey(key);
34+
Object valueRef;
35+
36+
// Try to get a cached value.
37+
valueRef = valueMap.get(lookupKeyRef);
38+
V value;
39+
40+
if (valueRef != null) {
41+
value = valueRefType.dereference(valueRef);
42+
if (value != null) {
43+
// A cached value was found.
44+
return value;
45+
}
46+
}
47+
48+
Object mutex = getOrCreateMutex(key);
49+
synchronized (mutex) {
50+
try {
51+
// Double-check after getting mutex
52+
valueRef = valueMap.get(lookupKeyRef);
53+
value = valueRef == null ? null : valueRefType.dereference(valueRef);
54+
if (value == null) {
55+
value = fn.apply(key);
56+
valueMap.put(
57+
keyRefType.createKeyReference(key, queue),
58+
valueRefType.createValueReference(value)
59+
);
60+
}
61+
} finally {
62+
producerMutexes.remove(key, mutex);
63+
}
64+
}
65+
66+
return value;
67+
}
68+
69+
public V forget(K key) {
70+
Object mutex = getOrCreateMutex(key);
71+
synchronized (mutex) {
72+
try {
73+
Object valueRef = valueMap.remove(keyRefType.createLookupKey(key));
74+
return valueRef == null ? null : valueRefType.dereference(valueRef);
75+
} finally {
76+
producerMutexes.remove(key, mutex);
77+
}
78+
}
79+
}
80+
81+
private Object getOrCreateMutex(K key) {
82+
Object createdMutex = new byte[0];
83+
Object existingMutex = producerMutexes.putIfAbsent(key, createdMutex);
84+
if (existingMutex != null) {
85+
return existingMutex;
86+
} else {
87+
return createdMutex;
88+
}
89+
}
90+
91+
private void expungeStaleEntries() {
92+
for (Reference<? extends K> ref; (ref = queue.poll()) != null;) {
93+
@SuppressWarnings("unchecked")
94+
Reference<K> keyRef = (Reference<K>) ref;
95+
// keyRef now is equal only to itself while referent is cleared already
96+
// so it's safe to remove it without ceremony (like getOrCreateMutex(keyRef) usage)
97+
valueMap.remove(keyRef);
98+
}
99+
}
100+
}

0 commit comments

Comments
 (0)