1
1
use std:: {
2
- sync:: atomic:: { AtomicI32 , AtomicU32 , Ordering } ,
2
+ sync:: atomic:: { AtomicI32 , AtomicU32 , Ordering , self } ,
3
3
thread,
4
4
time:: { Duration , Instant } ,
5
5
} ;
@@ -74,8 +74,12 @@ impl Condvar {
74
74
"CRITICAL: too many waiters"
75
75
) ;
76
76
let mutex = unsafe { lock_api:: MutexGuard :: < ' _ , PiLock , T > :: mutex ( mutex_guard) . raw ( ) } ;
77
- if mutex. is_locked ( ) {
78
- mutex. perform_unlock ( ) ;
77
+ macro_rules! unlock {
78
+ ( ) => {
79
+ if mutex. is_locked( ) {
80
+ mutex. perform_unlock( ) ;
81
+ }
82
+ } ;
79
83
}
80
84
let fx: & Futex < Private > = self . fx . as_futex ( ) ;
81
85
let result = if let Some ( timeout) = timeout {
@@ -84,6 +88,8 @@ impl Condvar {
84
88
let Some ( remaining) = timeout. checked_sub ( now. elapsed ( ) ) else {
85
89
break WaitTimeoutResult { timed_out : true } ;
86
90
} ;
91
+ unlock ! ( ) ;
92
+ //atomic::fence(Ordering::SeqCst);
87
93
match fx. wait_for ( 0 , remaining) {
88
94
Ok ( ( ) ) => break WaitTimeoutResult { timed_out : false } ,
89
95
Err ( TimedWaitError :: TimedOut ) => break WaitTimeoutResult { timed_out : true } ,
@@ -93,6 +99,8 @@ impl Condvar {
93
99
}
94
100
} else {
95
101
loop {
102
+ unlock ! ( ) ;
103
+ //atomic::fence(Ordering::SeqCst);
96
104
match fx. wait ( 0 ) {
97
105
Ok ( ( ) ) => break WaitTimeoutResult { timed_out : false } ,
98
106
Err ( WaitError :: Interrupted ) => continue ,
@@ -108,25 +116,57 @@ impl Condvar {
108
116
/// Notifies one thread waiting on this condvar.
109
117
pub fn notify_one ( & self ) {
110
118
let fx: & Futex < Private > = self . fx . as_futex ( ) ;
119
+ let op_start = Instant :: now ( ) ;
120
+ let mut backoff = Backoff :: new ( ) ;
111
121
while self . waiters . load ( Ordering :: SeqCst ) > 0 && fx. wake ( 1 ) == 0 {
112
122
// there is a chance that some waiter has not been entered into the futex yet, waiting
113
123
// for it in a tiny spin loop
114
- thread :: yield_now ( ) ;
124
+ backoff . backoff ( ) ;
115
125
}
126
+ println ! ( "notify_one took {:?} ({})" , op_start. elapsed( ) , backoff. c) ;
116
127
}
117
128
118
129
/// Notifies all threads waiting on this condvar.
119
130
pub fn notify_all ( & self ) {
120
131
let fx: & Futex < Private > = self . fx . as_futex ( ) ;
132
+ let mut backoff = Backoff :: new ( ) ;
121
133
loop {
122
134
let to_wake = self . waiters . load ( Ordering :: SeqCst ) ;
123
135
if to_wake == 0 || fx. wake ( to_wake) == to_wake {
124
136
break ;
125
137
}
126
138
// there is a chance that some waiter has not been entered into the futex yet, waiting
127
139
// for it in a tiny spin loop
128
- thread:: yield_now ( ) ;
140
+ backoff. backoff ( ) ;
141
+ }
142
+ }
143
+ }
144
+
145
+ struct Backoff {
146
+ n : u32 ,
147
+ c : u32 ,
148
+ }
149
+
150
+ impl Backoff {
151
+ fn new ( ) -> Self {
152
+ Self { n : 0 , c : 0 }
153
+ }
154
+
155
+ fn backoff ( & mut self ) {
156
+ if self . n == 0 {
157
+ // first iteration = spin
158
+ for _ in 0 ..10 {
159
+ thread:: yield_now ( ) ;
160
+ }
161
+ } else {
162
+ // otherwise sleep
163
+ thread:: sleep ( Duration :: from_micros ( self . n . into ( ) ) ) ;
164
+ }
165
+ if self . n < 100 {
166
+ // max sleep time = 100us
167
+ self . n += 50 ;
129
168
}
169
+ self . c += 1 ;
130
170
}
131
171
}
132
172
0 commit comments