Skip to content

Commit 4f164de

Browse files
committed
improve readability, provenance compliance, and documentation for shared arc waker
1 parent 1d0be87 commit 4f164de

File tree

5 files changed

+141
-88
lines changed

5 files changed

+141
-88
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ harness = false
2828
bitvec = { version = "1.0.1", default-features = false, features = ["alloc"] }
2929
futures-core = "0.3"
3030
pin-project = "1.0.8"
31+
sptr = "0.3.2"
3132

3233
[dev-dependencies]
3334
futures = "0.3.25"

src/utils/wakers/array/waker_array.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use core::task::Waker;
33
use std::sync::{Arc, Mutex};
44

55
use super::{
6-
super::shared_arc::{waker_for_wake_data_slot, WakeDataContainer},
6+
super::shared_arc::{waker_from_redirect_position, SharedArcContent},
77
ReadinessArray,
88
};
99

@@ -15,7 +15,7 @@ pub(crate) struct WakerArray<const N: usize> {
1515

1616
/// See [super::super::shared_arc] for how this works.
1717
struct WakerArrayInner<const N: usize> {
18-
wake_data: [*const Self; N],
18+
redirect: [*const Self; N],
1919
readiness: Mutex<ReadinessArray<N>>,
2020
}
2121

@@ -24,7 +24,7 @@ impl<const N: usize> WakerArray<N> {
2424
pub(crate) fn new() -> Self {
2525
let mut inner = Arc::new(WakerArrayInner {
2626
readiness: Mutex::new(ReadinessArray::new()),
27-
wake_data: [std::ptr::null(); N], // We don't know the Arc's address yet so put null for now.
27+
redirect: [std::ptr::null(); N], // We don't know the Arc's address yet so put null for now.
2828
});
2929
let raw = Arc::into_raw(Arc::clone(&inner)); // The Arc's address.
3030

@@ -33,14 +33,14 @@ impl<const N: usize> WakerArray<N> {
3333
// So N Wakers -> count = N+1.
3434
unsafe { Arc::decrement_strong_count(raw) }
3535

36-
// Make wake_data all point to the Arc itself.
37-
Arc::get_mut(&mut inner).unwrap().wake_data = [raw; N];
36+
// Make redirect all point to the Arc itself.
37+
Arc::get_mut(&mut inner).unwrap().redirect = [raw; N];
3838

39-
// Now the wake_data array is complete. Time to create the actual Wakers.
39+
// Now the redirect array is complete. Time to create the actual Wakers.
4040
let wakers = array::from_fn(|i| {
41-
let data = inner.wake_data.get(i).unwrap();
41+
let data = inner.redirect.get(i).unwrap();
4242
unsafe {
43-
waker_for_wake_data_slot::<WakerArrayInner<N>>(
43+
waker_from_redirect_position::<WakerArrayInner<N>>(
4444
data as *const *const WakerArrayInner<N>,
4545
)
4646
}
@@ -59,9 +59,9 @@ impl<const N: usize> WakerArray<N> {
5959
}
6060
}
6161

62-
impl<const N: usize> WakeDataContainer for WakerArrayInner<N> {
63-
fn get_wake_data_slice(&self) -> &[*const Self] {
64-
&self.wake_data
62+
impl<const N: usize> SharedArcContent for WakerArrayInner<N> {
63+
fn get_redirect_slice(&self) -> &[*const Self] {
64+
&self.redirect
6565
}
6666

6767
fn wake_index(&self, index: usize) {

src/utils/wakers/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
//! Wakers that track when they are woken.
2+
//!
3+
//! By tracking which subfutures have woken, we can avoid having to re-poll N subfutures every time.
4+
//! This tracking is done by a [ReadinessArray]/[ReadinessVec]. These store the indexes of the subfutures that have woken.
5+
//! Each subfuture are given a Waker when polled.
6+
//! This waker must know the index of its corresponding subfuture so that it can update Readiness correctly.
7+
//!
8+
19
mod array;
210
mod shared_arc;
311
mod vec;

src/utils/wakers/shared_arc.rs

Lines changed: 110 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,92 +1,136 @@
1-
use core::task::{RawWaker, RawWakerVTable, Waker};
2-
use std::sync::Arc;
1+
//! To save on allocations, we avoid making a separate Arc Waker for every subfuture.
2+
//! Rather, we have all N Wakers share a single Arc, and use a "redirect" mechanism to allow different wakers to be distinguished.
3+
//! The mechanism works as follows.
4+
//! The Arc contains 2 things:
5+
//! - the Readiness structure ([ReadinessArray][super::array::ReadinessArray] / [ReadinessVec][super::vec::ReadinessVec])
6+
//! - the redirect array.
7+
//! The redirect array contains N repeated copies of the pointer to the Arc itself (obtained by `Arc::into_raw`).
8+
//! The Waker for the `i`th subfuture points to the `i`th item in the redirect array.
9+
//! (i.e. the Waker pointer is `*const *const A` where `A` is the type of the item in the Arc)
10+
//! When the Waker is woken, we deref it twice (giving reference to the content of the Arc),
11+
//! and compare it to the address of the redirect slice.
12+
//! The difference tells us the index of the waker. We can then record this woken index in the Readiness.
13+
//!
14+
//! ```text
15+
//! ┌───────────────────────────┬──────────────┬──────────────┐
16+
//! │ │ │ │
17+
//! │ / ┌─────────────┬──────┼───────┬──────┼───────┬──────┼───────┬─────┐ \
18+
//! ▼ / │ │ │ │ │ │ │ │ │ \
19+
//! Arc < │ Readiness │ redirect[0] │ redirect[1] │ redirect[2] │ ... │ >
20+
//! ▲ \ │ │ │ │ │ │ /
21+
//! │ \ └─────────────┴──────▲───────┴──────▲───────┴──────▲───────┴─────┘ /
22+
//! │ │ │ │
23+
//! └─┐ ┌───────────────┘ │ │
24+
//! │ │ │ │
25+
//! │ │ ┌──────────────────┘ │
26+
//! │ │ │ │
27+
//! │ │ │ ┌─────────────────────┘
28+
//! │ │ │ │
29+
//! │ │ │ │
30+
//! ┌────┼────┬────┼──────┬────┼──────┬────┼──────┬─────┐
31+
//! │ │ │ │ │ │ │ │ │ │
32+
//! │ │ wakers[0] │ wakers[1] │ wakers[2] │ ... │
33+
//! │ │ │ │ │ │
34+
//! └─────────┴───────────┴───────────┴───────────┴─────┘
35+
//! ```
336
4-
// In the diagram below, `A` is the upper block.
5-
// It is a struct that implements WakeDataContainer (so either WakerVecInner or WakerArrayInner).
6-
// The lower block is either WakerVec or WakerArray. Each waker there points to a slot of wake_data in `A`.
7-
// Every one of these slots contain a pointer to the Arc wrapping `A` itself.
8-
// Wakers figure out their indices by comparing the address they are pointing to to `wake_data`'s start address.
9-
//
10-
// ┌───────────────────────────┬──────────────┬──────────────┐
11-
// │ │ │ │
12-
// │ / ┌─────────────┬──────┼───────┬──────┼───────┬──────┼───────┬─────┐ \
13-
// ▼ / │ │ │ │ │ │ │ │ │ \
14-
// Arc < │ Readiness │ wake_data[0] │ wake_data[1] │ wake_data[2] │ ... │ >
15-
// ▲ \ │ │ │ │ │ │ /
16-
// │ \ └─────────────┴──────▲───────┴──────▲───────┴──────▲───────┴─────┘ /
17-
// │ │ │ │
18-
// └─┐ ┌───────────────┘ │ │
19-
// │ │ │ │
20-
// │ │ ┌──────────────────┘ │
21-
// │ │ │ │
22-
// │ │ │ ┌─────────────────────┘
23-
// │ │ │ │
24-
// │ │ │ │
25-
// ┌────┼────┬────┼──────┬────┼──────┬────┼──────┬─────┐
26-
// │ │ │ │ │ │ │ │ │ │
27-
// │ Inner │ wakers[0] │ wakers[1] │ wakers[2] │ ... │
28-
// │ │ │ │ │ │
29-
// └─────────┴───────────┴───────────┴───────────┴─────┘
37+
// TODO: Right now each waker gets its own redirect slot.
38+
// We can save space by making size_of::<*const _>() wakers share the same slot.
39+
// With such change, in 64-bit system, the redirect array/vec would only need ⌈N/8⌉ slots instead of N.
3040

31-
// TODO: Right now each waker gets its own wake_data slot.
32-
// We can save space by making size_of::<usize>() wakers share the same slot.
33-
// With such change, in 64-bit system, the wake_data array/vec would only need ⌈N/8⌉ slots instead of N.
41+
use core::task::{RawWaker, RawWakerVTable, Waker};
42+
use std::sync::Arc;
3443

35-
pub(super) trait WakeDataContainer {
36-
/// Get the reference of the wake_data slice. This is used to compute the index.
37-
fn get_wake_data_slice(&self) -> &[*const Self];
44+
/// A trait to be implemented on [super::WakerArray] and [super::WakerVec] for polymorphism.
45+
/// These are the type that goes in the Arc. They both contain the Readiness and the redirect array/vec.
46+
pub(super) trait SharedArcContent {
47+
/// Get the reference of the redirect slice. This is used to compute the index.
48+
fn get_redirect_slice(&self) -> &[*const Self];
3849
/// Called when the subfuture at the specified index should be polled.
50+
/// Should call `Readiness::set_ready`.
3951
fn wake_index(&self, index: usize);
4052
}
41-
pub(super) unsafe fn waker_for_wake_data_slot<A: WakeDataContainer>(
53+
54+
/// Create one waker following the mechanism described in the [module][self] doc.
55+
/// The following must be upheld for safety:
56+
/// - `pointer` must points to a slot in the redirect array.
57+
/// - that slot must contain a pointer obtained by `Arc::<A>::into_raw`.
58+
/// - the Arc must still be alive at the time this function is called.
59+
/// The following should be upheld for correct behavior:
60+
/// - calling `SharedArcContent::get_redirect_slice` on the content of the Arc should give the redirect array within which `pointer` points to.
61+
#[deny(unsafe_op_in_unsafe_fn)]
62+
pub(super) unsafe fn waker_from_redirect_position<A: SharedArcContent>(
4263
pointer: *const *const A,
4364
) -> Waker {
44-
unsafe fn clone_waker<A: WakeDataContainer>(pointer: *const ()) -> RawWaker {
65+
/// Create a Waker from a type-erased pointer.
66+
/// The pointer must satisfy the safety constraints listed in the wrapping function's documentation.
67+
unsafe fn create_waker<A: SharedArcContent>(pointer: *const ()) -> RawWaker {
68+
// Retype the type-erased pointer.
4569
let pointer = pointer as *const *const A;
46-
let raw = *pointer; // This is the raw pointer of Arc<Inner>.
4770

4871
// We're creating a new Waker, so we need to increment the count.
49-
Arc::increment_strong_count(raw);
72+
// SAFETY: The constraints listed for the wrapping function documentation means
73+
// - `*pointer` is an `*const A` obtained from `Arc::<A>::into_raw`.
74+
// - the Arc is alive.
75+
// So this operation is safe.
76+
unsafe { Arc::increment_strong_count(*pointer) };
5077

5178
RawWaker::new(pointer as *const (), create_vtable::<A>())
5279
}
5380

54-
// Convert a pointer to a wake_data slot to the Arc<Inner>.
55-
unsafe fn to_arc<A: WakeDataContainer>(pointer: *const *const A) -> Arc<A> {
56-
let raw = *pointer;
57-
Arc::from_raw(raw)
58-
}
59-
unsafe fn wake<A: WakeDataContainer, const BY_REF: bool>(pointer: *const ()) {
81+
/// Invoke `SharedArcContent::wake_index` with the index in the redirect slice where this pointer points to.
82+
/// The pointer must satisfy the safety constraints listed in the wrapping function's documentation.
83+
unsafe fn wake_by_ref<A: SharedArcContent>(pointer: *const ()) {
84+
// Retype the type-erased pointer.
6085
let pointer = pointer as *const *const A;
61-
let arc = to_arc::<A>(pointer);
62-
// Calculate the index
63-
let index = ((pointer as usize) // This is the slot our pointer points to.
64-
- (arc.get_wake_data_slice() as *const [*const A] as *const () as usize)) // This is the starting address of wake_data.
65-
/ std::mem::size_of::<*const A>();
6686

67-
arc.wake_index(index);
87+
// SAFETY: we are already requiring `pointer` to point to a slot in the redirect array.
88+
let raw: *const A = unsafe { *pointer };
89+
// SAFETY: we are already requiring the pointer in the redirect array slot to be obtained from `Arc::into_raw`.
90+
let arc_content: &A = unsafe { &*raw };
6891

69-
// Dropping the Arc would decrement the strong count.
70-
// We only want to do that when we're not waking by ref.
71-
if BY_REF {
72-
std::mem::forget(arc);
73-
} else {
74-
std::mem::drop(arc);
75-
}
92+
// Calculate the index.
93+
// This is your familiar pointer math
94+
// `item_address = array_address + (index * item_size)`
95+
// rearranged to
96+
// `index = (item_address - array_address) / item_size`.
97+
let item_address = sptr::Strict::addr(pointer);
98+
let redirect_slice_address = sptr::Strict::addr(arc_content.get_redirect_slice().as_ptr());
99+
let redirect_item_size = core::mem::size_of::<*const A>(); // the size of each item in the redirect slice
100+
let index = (item_address - redirect_slice_address) / redirect_item_size;
101+
102+
arc_content.wake_index(index);
76103
}
77-
unsafe fn drop_waker<A: WakeDataContainer>(pointer: *const ()) {
104+
105+
/// The pointer must satisfy the safety constraints listed in the wrapping function's documentation.
106+
unsafe fn drop_waker<A: SharedArcContent>(pointer: *const ()) {
107+
// Retype the type-erased pointer.
78108
let pointer = pointer as *const *const A;
79-
let arc = to_arc::<A>(pointer);
80-
// Decrement the strong count by dropping the Arc.
81-
std::mem::drop(arc);
109+
110+
// SAFETY: we are already requiring `pointer` to point to a slot in the redirect array.
111+
let raw = unsafe { *pointer };
112+
// SAFETY: we are already requiring the pointer in the redirect array slot to be obtained from `Arc::into_raw`.
113+
unsafe { Arc::decrement_strong_count(raw) };
82114
}
83-
fn create_vtable<A: WakeDataContainer>() -> &'static RawWakerVTable {
115+
116+
/// The pointer must satisfy the safety constraints listed in the wrapping function's documentation.
117+
unsafe fn wake<A: SharedArcContent>(pointer: *const ()) {
118+
// SAFETY: we are already requiring the constraints of `wake_by_ref` and `drop_waker`.
119+
unsafe {
120+
wake_by_ref::<A>(pointer);
121+
drop_waker::<A>(pointer);
122+
}
123+
}
124+
125+
fn create_vtable<A: SharedArcContent>() -> &'static RawWakerVTable {
84126
&RawWakerVTable::new(
85-
clone_waker::<A>,
86-
wake::<A, false>,
87-
wake::<A, true>,
127+
create_waker::<A>,
128+
wake::<A>,
129+
wake_by_ref::<A>,
88130
drop_waker::<A>,
89131
)
90132
}
91-
Waker::from_raw(clone_waker::<A>(pointer as *const ()))
133+
// SAFETY: All our vtable functions adhere to the RawWakerVTable contract,
134+
// and we are already requiring that `pointer` is what our functions expect.
135+
unsafe { Waker::from_raw(create_waker::<A>(pointer as *const ())) }
92136
}

src/utils/wakers/vec/waker_vec.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use core::task::Waker;
22
use std::sync::{Arc, Mutex};
33

44
use super::{
5-
super::shared_arc::{waker_for_wake_data_slot, WakeDataContainer},
5+
super::shared_arc::{waker_from_redirect_position, SharedArcContent},
66
ReadinessVec,
77
};
88

@@ -14,7 +14,7 @@ pub(crate) struct WakerVec {
1414

1515
/// See [super::super::shared_arc] for how this works.
1616
struct WakerVecInner {
17-
wake_data: Vec<*const Self>,
17+
redirect: Vec<*const Self>,
1818
readiness: Mutex<ReadinessVec>,
1919
}
2020

@@ -23,7 +23,7 @@ impl WakerVec {
2323
pub(crate) fn new(len: usize) -> Self {
2424
let mut inner = Arc::new(WakerVecInner {
2525
readiness: Mutex::new(ReadinessVec::new(len)),
26-
wake_data: Vec::new(),
26+
redirect: Vec::new(),
2727
});
2828
let raw = Arc::into_raw(Arc::clone(&inner)); // The Arc's address.
2929

@@ -32,15 +32,15 @@ impl WakerVec {
3232
// So N Wakers -> count = N+1.
3333
unsafe { Arc::decrement_strong_count(raw) }
3434

35-
// Make wake_data all point to the Arc itself.
36-
Arc::get_mut(&mut inner).unwrap().wake_data = vec![raw; len];
35+
// Make redirect all point to the Arc itself.
36+
Arc::get_mut(&mut inner).unwrap().redirect = vec![raw; len];
3737

38-
// Now the wake_data vec is complete. Time to create the actual Wakers.
38+
// Now the redirect vec is complete. Time to create the actual Wakers.
3939
let wakers = inner
40-
.wake_data
40+
.redirect
4141
.iter()
4242
.map(|data| unsafe {
43-
waker_for_wake_data_slot::<WakerVecInner>(data as *const *const WakerVecInner)
43+
waker_from_redirect_position::<WakerVecInner>(data as *const *const WakerVecInner)
4444
})
4545
.collect();
4646

@@ -56,9 +56,9 @@ impl WakerVec {
5656
}
5757
}
5858

59-
impl WakeDataContainer for WakerVecInner {
60-
fn get_wake_data_slice(&self) -> &[*const Self] {
61-
&self.wake_data
59+
impl SharedArcContent for WakerVecInner {
60+
fn get_redirect_slice(&self) -> &[*const Self] {
61+
&self.redirect
6262
}
6363

6464
fn wake_index(&self, index: usize) {

0 commit comments

Comments
 (0)