File tree Expand file tree Collapse file tree 2 files changed +42
-5
lines changed Expand file tree Collapse file tree 2 files changed +42
-5
lines changed Original file line number Diff line number Diff line change @@ -19,7 +19,7 @@ lazy_static! {
19
19
Script :: new( include_str!( "./redis-scripts/receiveMessage.lua" ) ) ;
20
20
}
21
21
22
- static JS_COMPAT_MAX_TIME_MILLIS : u64 = 9_999_999_000 ;
22
+ const JS_COMPAT_MAX_TIME_MILLIS : u64 = 9_999_999_000 ;
23
23
24
24
/// The main object of this library. Creates/Handles the redis connection and contains all the methods
25
25
#[ derive( Clone ) ]
@@ -212,7 +212,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
212
212
. arg ( & key)
213
213
. cmd ( "ZCOUNT" )
214
214
. arg ( & key)
215
- . arg ( time. 0 )
215
+ . arg ( time. 0 * 1000 )
216
216
. arg ( "+inf" )
217
217
. query_async ( conn)
218
218
. await ?;
@@ -470,7 +470,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
470
470
. query_async ( conn)
471
471
. await ?;
472
472
473
- let time_millis = ( result. 1 ) . 0 * 1000 ;
473
+ let time_micros = ( result. 1 ) . 0 * 1000000 + ( result . 1 ) . 1 ;
474
474
475
475
let ( hmget_first, hmget_second, hmget_third) =
476
476
match ( result. 0 . first ( ) , result. 0 . get ( 1 ) , result. 0 . get ( 2 ) ) {
@@ -479,7 +479,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
479
479
} ;
480
480
481
481
let quid = if uid {
482
- Some ( radix_36 ( time_millis ) . to_string ( ) + & RsmqFunctions :: < T > :: make_id ( 22 ) ?)
482
+ Some ( radix_36 ( time_micros ) . to_string ( ) + & RsmqFunctions :: < T > :: make_id ( 22 ) ?)
483
483
} else {
484
484
None
485
485
} ;
@@ -494,7 +494,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
494
494
maxsize : hmget_third
495
495
. parse ( )
496
496
. map_err ( |_| RsmqError :: CannotParseMaxsize ) ?,
497
- ts : time_millis ,
497
+ ts : time_micros / 1000 ,
498
498
uid : quid,
499
499
} )
500
500
}
Original file line number Diff line number Diff line change @@ -447,3 +447,40 @@ fn change_queue_size() {
447
447
assert_eq ! ( attributes. maxsize, -1 ) ;
448
448
} )
449
449
}
450
+
451
+ #[ test]
452
+ fn sent_messages_must_keep_order ( ) {
453
+ let rt = tokio:: runtime:: Runtime :: new ( ) . unwrap ( ) ;
454
+
455
+ rt. block_on ( async move {
456
+ let ctx = TestContext :: new ( ) ;
457
+ let connection = ctx. async_connection ( ) . await . unwrap ( ) ;
458
+ let mut rsmq = Rsmq :: new_with_connection ( connection, false , None ) ;
459
+
460
+ rsmq. create_queue ( "queue1" , None , None , None ) . await . unwrap ( ) ;
461
+
462
+ for i in 0 ..10000 {
463
+ rsmq. send_message ( "queue1" , format ! ( "testmessage{}" , i) , None )
464
+ . await
465
+ . unwrap ( ) ;
466
+ }
467
+
468
+ for i in 0 ..10000 {
469
+ let message = rsmq
470
+ . receive_message :: < String > ( "queue1" , None )
471
+ . await
472
+ . unwrap ( ) . unwrap ( ) ;
473
+ assert_eq ! ( message. message, format!( "testmessage{}" , i) ) ;
474
+
475
+ rsmq. delete_message ( "queue1" , & message. id ) . await . unwrap ( ) ;
476
+ }
477
+
478
+ let message = rsmq
479
+ . receive_message :: < String > ( "queue1" , None )
480
+ . await
481
+ . unwrap ( ) ;
482
+
483
+ assert ! ( message. is_none( ) ) ;
484
+ rsmq. delete_queue ( "queue1" ) . await . unwrap ( ) ;
485
+ } )
486
+ }
You can’t perform that action at this time.
0 commit comments