@@ -68,10 +68,10 @@ pub fn partition_key_to_shard_id(
68
68
} ;
69
69
70
70
for shard in shards {
71
- let start = BigInt :: from_str_radix ( & shard. start_hash_range_key , 16 ) . map_err ( |err| {
71
+ let start = BigInt :: from_str_radix ( & shard. start_hash_range_key , 10 ) . map_err ( |err| {
72
72
common:: Error :: PartitionKeyError ( common:: PartitionKeyError :: ParseBigIntError ( err) )
73
73
} ) ?;
74
- let end = BigInt :: from_str_radix ( & shard. end_hash_range_key , 16 ) . map_err ( |err| {
74
+ let end = BigInt :: from_str_radix ( & shard. end_hash_range_key , 10 ) . map_err ( |err| {
75
75
common:: Error :: PartitionKeyError ( common:: PartitionKeyError :: ParseBigIntError ( err) )
76
76
} ) ?;
77
77
@@ -85,9 +85,66 @@ pub fn partition_key_to_shard_id(
85
85
) )
86
86
}
87
87
88
+ #[ doc( hidden) ]
88
89
#[ macro_export]
89
90
macro_rules! format_url {
90
91
( $scheme: expr, $host: expr, $port: expr) => {
91
92
format!( "{}://{}:{}" , $scheme, $host, $port)
92
93
} ;
93
94
}
95
+
96
+ #[ cfg( test) ]
97
+ mod tests {
98
+ use std:: collections:: HashMap ;
99
+ use std:: env;
100
+
101
+ use hstreamdb_pb:: { ListShardsRequest , Stream } ;
102
+ use hstreamdb_test_utils:: rand_alphanumeric;
103
+
104
+ use super :: partition_key_to_shard_id;
105
+ use crate :: client:: Client ;
106
+ use crate :: ChannelProviderSettings ;
107
+
108
+ #[ tokio:: test( flavor = "multi_thread" ) ]
109
+ async fn test_partition_key_to_shard_id ( ) {
110
+ let addr = env:: var ( "TEST_SERVER_ADDR" ) . unwrap ( ) ;
111
+ let mut client = Client :: new (
112
+ addr,
113
+ ChannelProviderSettings {
114
+ concurrency_limit : None ,
115
+ } ,
116
+ )
117
+ . await
118
+ . unwrap ( ) ;
119
+
120
+ let stream_name = rand_alphanumeric ( 20 ) ;
121
+
122
+ client
123
+ . create_stream ( Stream {
124
+ stream_name : stream_name. clone ( ) ,
125
+ replication_factor : 1 ,
126
+ backlog_duration : 10 * 60 ,
127
+ shard_count : 200 ,
128
+ } )
129
+ . await
130
+ . unwrap ( ) ;
131
+
132
+ let shards = client
133
+ . channels
134
+ . channel ( )
135
+ . await
136
+ . list_shards ( ListShardsRequest { stream_name } )
137
+ . await
138
+ . unwrap ( )
139
+ . into_inner ( )
140
+ . shards ;
141
+
142
+ let mut result = HashMap :: new ( ) ;
143
+ for _ in 0 ..400 {
144
+ let shard_id = partition_key_to_shard_id ( & shards, rand_alphanumeric ( 20 ) ) . unwrap ( ) ;
145
+ result. insert ( shard_id, ( ) ) ;
146
+ }
147
+ println ! ( "result.len() = {}" , result. len( ) ) ;
148
+ assert ! ( result. len( ) > 100 )
149
+ }
150
+ }
0 commit comments