Skip to content

Commit f05358d

Browse files
committed
wip
1 parent c304b84 commit f05358d

File tree

6 files changed

+167
-45
lines changed

6 files changed

+167
-45
lines changed

Cargo.toml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,16 @@ object-id = "0.1"
1515
thiserror = "1.0"
1616
rtsc-derive = "0.1"
1717
pin-project = "1.1"
18+
parking_lot_rt = "0.12"
1819

1920
[target.'cfg(target_os = "linux")'.dependencies]
2021
lock_api = "0.4.12"
2122
libc = "0.2.155"
2223
linux-futex = "1.0.0"
2324

24-
[target.'cfg(not(target_os = "linux"))'.dependencies]
25-
parking_lot_rt = "0.12"
26-
2725
[dev-dependencies]
2826
insta = "1.39"
2927
tokio = { version = "1.36", features = ["rt", "macros", "time"] }
30-
parking_lot_rt = "0.12"
3128

3229
[features]
3330
serde = ["lock_api/serde"]

src/buf.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ mod test {
6969

7070
#[test]
7171
fn test_data_buffer() {
72-
let buf: DataBuffer<i32> = DataBuffer::bounded(3);
72+
let buf: DataBuffer<_> = DataBuffer::bounded(3);
7373
assert_eq!(buf.len(), 0);
7474
buf.try_push(1);
7575
assert_eq!(buf.len(), 1);
@@ -85,7 +85,7 @@ mod test {
8585

8686
#[test]
8787
fn test_data_buffer_other_mutex() {
88-
let buf: DataBuffer<i32> = DataBuffer::bounded(3);
88+
let buf: DataBuffer<i32, parking_lot_rt::RawMutex> = DataBuffer::bounded(3);
8989
assert_eq!(buf.len(), 0);
9090
}
9191
}

src/cell/datacell.rs

Lines changed: 63 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,69 @@
1-
use crate::{Error, Result};
1+
use crate::{
2+
condvar_api::RawCondvar,
3+
locking::{Condvar, RawMutex},
4+
Error, Result,
5+
};
6+
use lock_api::RawMutex as RawMutexTrait;
27
use std::{sync::Arc, time::Duration};
38

4-
use crate::locking::{Condvar, Mutex};
5-
69
struct CellValue<P> {
710
current: Option<P>,
811
closed: bool,
912
}
1013

11-
struct DataCellInner<P> {
12-
value: Mutex<CellValue<P>>,
13-
data_available: Condvar,
14+
impl<P> Default for CellValue<P> {
15+
fn default() -> Self {
16+
Self {
17+
current: None,
18+
closed: false,
19+
}
20+
}
21+
}
22+
23+
struct DataCellInner<P, M, CV> {
24+
value: lock_api::Mutex<M, CellValue<P>>,
25+
data_available: CV,
1426
}
1527

1628
/// A simple data cell that can be used to pass data between threads. Acts similarly to a
1729
/// ring-buffer channel with a capacity of 1.
18-
pub struct DataCell<P> {
19-
inner: Arc<DataCellInner<P>>,
30+
pub struct DataCell<P, M = RawMutex, CV = Condvar>
31+
{
32+
inner: Arc<DataCellInner<P, M, CV>>,
2033
}
2134

22-
impl<P> Default for DataCell<P> {
35+
impl<P, M, CV> Default for DataCell<P, M, CV>
36+
where
37+
M: RawMutexTrait,
38+
CV: RawCondvar,
39+
{
2340
fn default() -> Self {
2441
Self {
2542
inner: Arc::new(DataCellInner {
26-
value: Mutex::new(CellValue {
27-
current: None,
28-
closed: false,
29-
}),
30-
data_available: Condvar::new(),
43+
value: <_>::default(),
44+
data_available: CV::new(),
3145
}),
3246
}
3347
}
3448
}
3549

36-
impl<P> Clone for DataCell<P> {
50+
impl<P, M, CV> Clone for DataCell<P, M, CV>
51+
where
52+
M: RawMutexTrait,
53+
CV: RawCondvar,
54+
{
3755
fn clone(&self) -> Self {
3856
Self {
3957
inner: self.inner.clone(),
4058
}
4159
}
4260
}
4361

44-
impl<P> DataCell<P> {
62+
impl<P, M, CV> DataCell<P, M, CV>
63+
where
64+
M: RawMutexTrait,
65+
CV: RawCondvar + RawCondvar<RawMutex = M>,
66+
{
4567
/// Creates a new data cell
4668
pub fn new() -> Self {
4769
Self::default()
@@ -80,7 +102,9 @@ impl<P> DataCell<P> {
80102
if let Some(current) = value.current.take() {
81103
return Ok(current);
82104
}
83-
self.inner.data_available.wait(&mut value);
105+
self.inner
106+
.data_available
107+
.wait::<CellValue<P>, M>(&mut value);
84108
}
85109
}
86110
/// Retrieves the data from the cell with the given timeout
@@ -96,7 +120,7 @@ impl<P> DataCell<P> {
96120
if self
97121
.inner
98122
.data_available
99-
.wait_for(&mut value, timeout)
123+
.wait_for::<CellValue<P>, M>(&mut value, timeout)
100124
.timed_out()
101125
{
102126
return Err(Error::Timeout);
@@ -113,7 +137,11 @@ impl<P> DataCell<P> {
113137
}
114138
}
115139

116-
impl<P> Iterator for DataCell<P> {
140+
impl<P, M, CV> Iterator for DataCell<P, M, CV>
141+
where
142+
M: RawMutexTrait,
143+
CV: RawCondvar + RawCondvar<RawMutex = M>,
144+
{
117145
type Item = P;
118146
fn next(&mut self) -> Option<Self::Item> {
119147
self.get().ok()
@@ -131,7 +159,20 @@ mod test {
131159

132160
#[test]
133161
fn test_datacell() {
134-
let cell = DataCell::new();
162+
let cell: DataCell<_> = DataCell::new();
163+
cell.set(32);
164+
let cell2 = cell.clone();
165+
let handle = thread::spawn(move || {
166+
thread::sleep(Duration::from_millis(100));
167+
cell2.set(42);
168+
});
169+
assert_eq!(cell.get().unwrap(), 42);
170+
handle.join().unwrap();
171+
}
172+
173+
#[test]
174+
fn test_datacell_other_mutex() {
175+
let cell: DataCell<_, parking_lot_rt::RawMutex, parking_lot_rt::Condvar> = DataCell::new();
135176
let cell2 = cell.clone();
136177
let handle = thread::spawn(move || {
137178
thread::sleep(Duration::from_millis(100));
@@ -143,7 +184,7 @@ mod test {
143184

144185
#[test]
145186
fn test_datacell_close() {
146-
let cell = DataCell::new();
187+
let cell: DataCell<_> = DataCell::new();
147188
let cell2 = cell.clone();
148189
let handle = thread::spawn(move || {
149190
cell2.set(42);
@@ -155,7 +196,7 @@ mod test {
155196

156197
#[test]
157198
fn test_datacell_try_get() {
158-
let cell = DataCell::new();
199+
let cell: DataCell<_> = DataCell::new();
159200
assert_eq!(cell.try_get().unwrap_err(), Error::ChannelEmpty);
160201
let cell2 = cell.clone();
161202
let handle = thread::spawn(move || {

src/condvar_api.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use std::time::Duration;
2+
3+
use lock_api::{MutexGuard, RawMutex};
4+
5+
/// The `Condvar` type trait
6+
pub trait RawCondvar {
7+
/// The mutex type
8+
type RawMutex: RawMutex;
9+
10+
/// Create a new `Condvar`
11+
fn new() -> Self;
12+
/// Wait on the `Condvar`
13+
fn wait<T, M>(&self, mutex_guard: &mut MutexGuard<'_, Self::RawMutex, T>);
14+
/// Wait on the `Condvar` with a timeout
15+
fn wait_for<T, M>(
16+
&self,
17+
mutex_guard: &mut MutexGuard<'_, Self::RawMutex, T>,
18+
timeout: Duration,
19+
) -> WaitTimeoutResult;
20+
/// Notify one waiter
21+
fn notify_one(&self);
22+
/// Notify all waiters
23+
fn notify_all(&self);
24+
}
25+
26+
/// Result, returned by [`Condvar::wait_for`].
27+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28+
pub struct WaitTimeoutResult {
29+
timed_out: bool,
30+
}
31+
32+
impl WaitTimeoutResult {
33+
/// Create a new `WaitTimeoutResult`
34+
pub fn new(timed_out: bool) -> Self {
35+
Self { timed_out }
36+
}
37+
/// Returns `true` if the wait timed out.
38+
pub fn timed_out(&self) -> bool {
39+
self.timed_out
40+
}
41+
}
42+
43+
impl RawCondvar for parking_lot_rt::Condvar {
44+
type RawMutex = parking_lot_rt::RawMutex;
45+
46+
fn new() -> Self {
47+
parking_lot_rt::Condvar::new()
48+
}
49+
50+
fn wait<T, M>(&self, mutex_guard: &mut MutexGuard<'_, Self::RawMutex, T>) {
51+
self.wait(mutex_guard)
52+
}
53+
54+
fn wait_for<T, M>(
55+
&self,
56+
mutex_guard: &mut MutexGuard<'_, Self::RawMutex, T>,
57+
timeout: Duration,
58+
) -> WaitTimeoutResult {
59+
WaitTimeoutResult::new(self.wait_for(mutex_guard, timeout).timed_out())
60+
}
61+
62+
fn notify_one(&self) {
63+
self.notify_one();
64+
}
65+
66+
fn notify_all(&self) {
67+
self.notify_all();
68+
}
69+
}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ pub use bma_ts;
3131
pub mod base_channel;
3232
/// Base async channel type, allows to build async channels with a custom storage
3333
pub mod base_channel_async;
34+
/// Conditional traits
35+
pub mod condvar_api;
3436
/// Time-limited operations
3537
pub mod ops;
3638
/// Policy-based deque

src/pi.rs

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use std::{
77
use linux_futex::{AsFutex as _, Futex, PiFutex, Private, TimedWaitError, WaitError};
88
use lock_api::{GuardSend, RawMutex as RawMutexTrait, RawMutexTimed};
99

10+
use crate::condvar_api::{RawCondvar, WaitTimeoutResult};
11+
1012
thread_local! {
1113
#[allow(clippy::cast_possible_truncation)]
1214
static TID: libc::pid_t = unsafe { libc::syscall(libc::SYS_gettid) as i32 }
@@ -17,19 +19,6 @@ fn tid() -> libc::pid_t {
1719
TID.with(|it| *it)
1820
}
1921

20-
/// Result, returned by [`Condvar::wait_for`].
21-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22-
pub struct WaitTimeoutResult {
23-
timed_out: bool,
24-
}
25-
26-
impl WaitTimeoutResult {
27-
/// Returns `true` if the wait timed out.
28-
pub fn timed_out(&self) -> bool {
29-
self.timed_out
30-
}
31-
}
32-
3322
/// Priority-inheritance based Condvar implementation for the priority-inheritance [`Mutex`].
3423
#[derive(Default)]
3524
pub struct Condvar {
@@ -86,12 +75,12 @@ impl Condvar {
8675
let now = Instant::now();
8776
loop {
8877
let Some(remaining) = timeout.checked_sub(now.elapsed()) else {
89-
break WaitTimeoutResult { timed_out: true };
78+
break WaitTimeoutResult::new(true)
9079
};
9180
unlock!();
9281
match fx.wait_for(0, remaining) {
93-
Ok(()) => break WaitTimeoutResult { timed_out: false },
94-
Err(TimedWaitError::TimedOut) => break WaitTimeoutResult { timed_out: true },
82+
Ok(()) => break WaitTimeoutResult::new(false),
83+
Err(TimedWaitError::TimedOut) => break WaitTimeoutResult::new(true),
9584
Err(TimedWaitError::Interrupted) => continue,
9685
Err(TimedWaitError::WrongValue) => unreachable!(),
9786
}
@@ -100,7 +89,7 @@ impl Condvar {
10089
loop {
10190
unlock!();
10291
match fx.wait(0) {
103-
Ok(()) => break WaitTimeoutResult { timed_out: false },
92+
Ok(()) => break WaitTimeoutResult::new(false),
10493
Err(WaitError::Interrupted) => continue,
10594
Err(WaitError::WrongValue) => unreachable!(),
10695
}
@@ -295,6 +284,30 @@ pub type MutexGuard<'a, T> = lock_api::MutexGuard<'a, PiLock, T>;
295284
/// Compatibility name
296285
pub type RawMutex = PiLock;
297286

287+
impl RawCondvar for Condvar {
288+
type RawMutex = PiLock;
289+
290+
fn new() -> Self {
291+
Self::new()
292+
}
293+
294+
fn wait<T, M>(&self, guard: &mut MutexGuard<T>) {
295+
self.wait(guard)
296+
}
297+
298+
fn wait_for<T, M>(&self, guard: &mut MutexGuard<T>, timeout: Duration) -> WaitTimeoutResult {
299+
self.wait_for(guard, timeout)
300+
}
301+
302+
fn notify_one(&self) {
303+
self.notify_one()
304+
}
305+
306+
fn notify_all(&self) {
307+
self.notify_all()
308+
}
309+
}
310+
298311
#[cfg(test)]
299312
mod tests {
300313
use std::{sync::Arc, thread, time::Duration};

0 commit comments

Comments
 (0)