1
1
use parking_lot:: { Mutex , Condvar } ;
2
2
use std:: mem;
3
3
use std:: sync:: { Arc } ;
4
- use std:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
5
4
use std:: sync:: mpsc:: { channel, Sender , Receiver } ;
6
5
use std:: thread;
7
6
use std:: time:: { Duration , Instant } ;
@@ -36,7 +35,6 @@ enum BuilderIn {
36
35
AppendSerialized {
37
36
value : AppendSerializedContext ,
38
37
} ,
39
- Close ,
40
38
}
41
39
42
40
///
@@ -46,13 +44,11 @@ struct BlockData {
46
44
block_info : BlockInfo ,
47
45
block_extra : BlockExtra ,
48
46
value_flow : ValueFlow ,
49
- stopped : bool ,
50
47
//log_time_gen: LogicalTimeGenerator,
51
48
end_lt : u64 , // biggest logical time of all messages
52
49
p1 : Duration ,
53
50
p2 : Duration ,
54
51
p3 : Duration ,
55
- counter : AtomicUsize
56
52
}
57
53
58
54
impl BlockData {
@@ -73,10 +69,9 @@ impl BlockData {
73
69
/// BlockBuilder structure
74
70
///
75
71
pub struct BlockBuilder {
76
- sender : Option < Arc < Mutex < Sender < BuilderIn > > > > ,
72
+ sender : Mutex < Option < Sender < BuilderIn > > > ,
77
73
current_block_data : Arc < Mutex < BlockData > > ,
78
74
stop_event : Arc < Condvar > ,
79
- stopped : AtomicBool ,
80
75
block_gen_utime : UnixTime32 ,
81
76
start_lt : u64 ,
82
77
}
@@ -139,20 +134,15 @@ impl BlockBuilder {
139
134
block_info : block_info,
140
135
block_extra : BlockExtra :: default ( ) ,
141
136
value_flow : ValueFlow :: default ( ) ,
142
- stopped : false ,
143
137
end_lt : 0 ,
144
138
p1 : Duration :: new ( 0 , 0 ) ,
145
139
p2 : Duration :: new ( 0 , 0 ) ,
146
140
p3 : Duration :: new ( 0 , 0 ) ,
147
- counter : AtomicUsize :: new ( 0 )
148
141
} ) ) ;
149
142
150
- let sender = Arc :: new ( Sender :: clone ( & sender) ) ;
151
-
152
143
let block = BlockBuilder {
153
- sender : Some ( Arc :: new ( Mutex :: new ( Sender :: clone ( & sender) ) ) ) ,
144
+ sender : Mutex :: new ( Some ( sender) ) ,
154
145
stop_event : stop_event. clone ( ) ,
155
- stopped : AtomicBool :: new ( false ) ,
156
146
current_block_data : current_block_data. clone ( ) ,
157
147
block_gen_utime,
158
148
start_lt,
@@ -176,17 +166,8 @@ impl BlockBuilder {
176
166
for msg in receiver {
177
167
let now = Instant :: now ( ) ;
178
168
match msg {
179
- BuilderIn :: Close => {
180
- let mut block_data = current_block_data. lock ( ) ;
181
- // set end logical time - logical time of last message of last transaction
182
- let end_lt = block_data. end_lt ;
183
- block_data. block_info . set_end_lt ( end_lt) ;
184
- block_data. stopped = true ;
185
- break ;
186
- } ,
187
169
BuilderIn :: Append { in_msg, out_msgs } => {
188
170
let mut block_data = current_block_data. lock ( ) ;
189
- block_data. counter . fetch_sub ( 1 , Ordering :: SeqCst ) ;
190
171
if let Err ( err) = Self :: append_messages ( & mut block_data, & in_msg, out_msgs) {
191
172
log:: error!( "error append messages {}" , err) ;
192
173
}
@@ -203,6 +184,11 @@ impl BlockBuilder {
203
184
let d = n. elapsed ( ) ;
204
185
current_block_data. lock ( ) . p3 = d;
205
186
187
+ let mut block_data = current_block_data. lock ( ) ;
188
+ // set end logical time - logical time of last message of last transaction
189
+ let end_lt = block_data. end_lt ;
190
+ block_data. block_info . set_end_lt ( end_lt) ;
191
+
206
192
stop_event. notify_one ( ) ;
207
193
}
208
194
@@ -290,8 +276,6 @@ impl BlockBuilder {
290
276
account_blocks. add_serialized_transaction ( & value. transaction , & value. transaction_cell ) ?;
291
277
block_data. block_extra . write_account_blocks ( & account_blocks) ?;
292
278
293
- block_data. counter . fetch_sub ( 1 , Ordering :: SeqCst ) ;
294
-
295
279
// calculate ValueFlow
296
280
// imported increase to in-value
297
281
if let Some ( in_value) = value. imported_value {
@@ -331,43 +315,30 @@ impl BlockBuilder {
331
315
/// Add transaction to block
332
316
///
333
317
pub fn add_transaction ( & self , in_msg : Arc < InMsg > , out_msgs : Vec < OutMsg > ) -> bool {
334
- if !self . stopped . load ( Ordering :: SeqCst ) {
335
- if let Some ( sender) = & self . sender {
336
- self . current_block_data . lock ( ) . counter . fetch_add ( 1 , Ordering :: SeqCst ) ;
337
- if sender. lock ( ) . send ( BuilderIn :: Append { in_msg, out_msgs } ) . is_err ( ) {
338
- return false
339
- }
340
- }
341
- return true
318
+ if let Some ( sender) = self . sender . lock ( ) . as_ref ( ) {
319
+ sender. send ( BuilderIn :: Append { in_msg, out_msgs } ) . is_ok ( )
320
+ } else {
321
+ false
342
322
}
343
- false
344
323
}
345
324
346
325
///
347
326
/// Add serialized transaction to block
348
327
///
349
328
pub fn add_serialized_transaction ( & self , value : AppendSerializedContext ) -> bool {
350
- if !self . stopped . load ( Ordering :: SeqCst ) {
351
- if let Some ( sender) = & self . sender {
352
- self . current_block_data . lock ( ) . counter . fetch_add ( 1 , Ordering :: SeqCst ) ;
353
- if sender. lock ( ) . send ( BuilderIn :: AppendSerialized { value } ) . is_err ( ) {
354
- return false
355
- }
356
- }
357
- return true
329
+ if let Some ( sender) = self . sender . lock ( ) . as_ref ( ) {
330
+ sender. send ( BuilderIn :: AppendSerialized { value } ) . is_ok ( )
331
+ } else {
332
+ false
358
333
}
359
- false
360
334
}
361
335
362
336
///
363
337
/// Stop processing messages thread.
364
338
///
365
339
fn brake_block_builder_thread ( & self ) {
366
- if let Some ( sender) = & self . sender {
367
- if sender. lock ( ) . send ( BuilderIn :: Close ) . is_err ( ) {
368
- error ! ( target: "node" , "try to break builder, but it is already broken" ) ;
369
- }
370
- }
340
+ let mut sender = self . sender . lock ( ) ;
341
+ * sender = None ;
371
342
}
372
343
373
344
///
@@ -400,24 +371,16 @@ impl BlockBuilder {
400
371
new_shard_state : & ShardStateUnsplit
401
372
) -> Result < ( Block , usize ) > {
402
373
403
- let mut time = [ 0u128 ; 6 ] ;
404
- let now = Instant :: now ( ) ;
374
+ let mut time = [ 0u128 ; 3 ] ;
375
+ let now = Instant :: now ( ) ;
376
+
377
+ let mut block_data = self . current_block_data . lock ( ) ;
405
378
406
- self . stopped . store ( true , Ordering :: SeqCst ) ;
407
379
self . brake_block_builder_thread ( ) ;
408
- self . stopped . store ( true , Ordering :: Relaxed ) ;
380
+ self . stop_event . wait ( & mut block_data ) ;
409
381
410
382
time[ 0 ] = now. elapsed ( ) . as_micros ( ) ;
411
- let now = Instant :: now ( ) ;
412
-
413
- let mut block_data = self . current_block_data . lock ( ) ;
414
- time[ 5 ] = block_data. counter . load ( Ordering :: SeqCst ) as u128 ;
415
- while !block_data. stopped {
416
- self . stop_event . wait ( & mut block_data) ;
417
- }
418
-
419
- time[ 1 ] = now. elapsed ( ) . as_micros ( ) ;
420
- let now = Instant :: now ( ) ;
383
+ let now = Instant :: now ( ) ;
421
384
422
385
// merkle updates for account_blocks calculating
423
386
let mut account_blocks = vec ! [ ] ;
@@ -460,7 +423,7 @@ info!(target: "ton_block", "want to remove shard state {}", str2);
460
423
& old_ss_root,
461
424
& new_ss_root) ?;
462
425
463
- time[ 2 ] = now. elapsed ( ) . as_micros ( ) ;
426
+ time[ 1 ] = now. elapsed ( ) . as_micros ( ) ;
464
427
let now = Instant :: now ( ) ;
465
428
466
429
let block = Block :: with_params (
@@ -471,15 +434,11 @@ let now = Instant::now();
471
434
mem:: take ( & mut block_data. block_extra ) ,
472
435
) ?;
473
436
474
- time[ 3 ] = now. elapsed ( ) . as_micros ( ) ;
475
- let now = Instant :: now ( ) ;
476
-
477
- // Let's calc blocks's id and save it in struct while block is accesed as mutable
437
+ time[ 2 ] = now. elapsed ( ) . as_micros ( ) ;
478
438
479
- time[ 4 ] = now. elapsed ( ) . as_micros ( ) ;
480
439
info ! ( target: "profiler" ,
481
- "Block builder time: {} / {} / {} / {} / {} / {} " ,
482
- time[ 0 ] , time[ 1 ] , time[ 2 ] , time [ 3 ] , time [ 4 ] , time [ 5 ]
440
+ "Block builder time: {} / {} / {}" ,
441
+ time[ 0 ] , time[ 1 ] , time[ 2 ]
483
442
) ;
484
443
info ! ( target: "profiler" ,
485
444
"Block builder thread time: {} / {} / {}" ,
0 commit comments