@@ -5,13 +5,44 @@ import TTLVCodec from './codec/ttlv';
5
5
import TlsTransport from './transport/tls' ;
6
6
import KMIPClient , { KmipClientOptions } from './Client' ;
7
7
import { KmsBackend , KMSInterface , KmsType } from '../KMSInterface' ;
8
- import type { Logger } from 'werelogs' ;
8
+ import { Logger } from 'werelogs' ;
9
9
import async from 'async' ;
10
+ import { ArsenalError , errorInstances } from '../../errors' ;
11
+ import { kmipMsg } from './errorMapping' ;
12
+
13
+ interface UnhealthyClient {
14
+ client : KMIPClient ,
15
+ healthcheckTimeout : ReturnType < typeof setTimeout > ,
16
+ } ;
17
+
18
+ /** Array without last item */
19
+ type ArrayPopped < Type extends unknown [ ] > = Type extends [ ...infer R , unknown ] ? R : never ;
20
+ /** Array's last item */
21
+ type ArrayLast < Type extends unknown [ ] > = Type extends [ ...unknown [ ] , infer R ] ? R : never
22
+
23
+ interface actions {
24
+ createBucketKey : Parameters < ClusterClient [ 'createBucketKey' ] > ;
25
+ destroyBucketKey : Parameters < ClusterClient [ 'destroyBucketKey' ] > ;
26
+ cipherDataKey : Parameters < ClusterClient [ 'cipherDataKey' ] > ;
27
+ decipherDataKey : Parameters < ClusterClient [ 'decipherDataKey' ] > ;
28
+ healthcheck : Parameters < ClusterClient [ 'healthcheck' ] > ;
29
+ } ;
30
+
31
+ interface ClusterClientOptions {
32
+ logger : Logger ;
33
+ retries ?: number ;
34
+ } ;
35
+
36
+ const UNHEALTHY_DURATION = 30_000 ; // 30s
10
37
11
38
export default class ClusterClient implements KMSInterface {
39
+ /** Healthy clients */
12
40
private readonly clients : KMIPClient [ ] ;
41
+ private readonly unhealthyClients : UnhealthyClient [ ] = [ ] ;
13
42
private roundRobinIndex = 0 ;
14
43
public readonly backend : KmsBackend < KmsType . external > ;
44
+ private readonly logger : Logger ;
45
+ private readonly retries : number ;
15
46
16
47
/**
17
48
* Construct a high level cluster of KMIP drivers suitable for cloudserver
@@ -36,7 +67,7 @@ export default class ClusterClient implements KMSInterface {
36
67
* defaults to TlsTransport
37
68
*/
38
69
constructor (
39
- options : KmipClientOptions ,
70
+ options : KmipClientOptions & ClusterClientOptions ,
40
71
CodecClass : any ,
41
72
TransportClass : any ,
42
73
) {
@@ -47,6 +78,9 @@ export default class ClusterClient implements KMSInterface {
47
78
TransportClass || TlsTransport ,
48
79
) ) ;
49
80
this . backend = this . clients [ 0 ] . backend ;
81
+ this . logger = options . logger ;
82
+ // if retries is not configured, we retry as much host there are in the cluster
83
+ this . retries = typeof options . retries === 'number' ? options . retries : this . clients . length - 1 ;
50
84
}
51
85
52
86
next ( ) {
@@ -56,25 +90,174 @@ export default class ClusterClient implements KMSInterface {
56
90
return this . clients [ this . roundRobinIndex ++ ] ;
57
91
}
58
92
93
+ checkUnhealthyClient ( unhealthyClient : UnhealthyClient ) {
94
+ const client = unhealthyClient . client ;
95
+ const healthyIndex = this . clients . indexOf ( client ) ;
96
+ if ( healthyIndex === - 1 ) {
97
+ this . clients . push ( client ) ;
98
+ } else {
99
+ // should not happen (S3C-9956)
100
+ this . logger . warn ( 'checkUnhealthyClient: already moved in healthy' , { unhealthyHost : client . host } ) ;
101
+ }
102
+
103
+ const unhealthyIndex = this . unhealthyClients . indexOf ( unhealthyClient ) ;
104
+ if ( unhealthyIndex === - 1 ) {
105
+ // should not happen (S3C-9956)
106
+ this . logger . warn ( 'checkUnhealthyClient: already moved out of unhealthy' , { unhealthyHost : client . host } ) ;
107
+ } else {
108
+ this . unhealthyClients . splice ( unhealthyIndex , 1 ) ;
109
+ }
110
+ this . logger . info ( 'kmip host healthy again' , {
111
+ unhealthyHost : client . host ,
112
+ unhealthy : this . unhealthyClients . length ,
113
+ healthy : this . clients . length ,
114
+ } ) ;
115
+ }
116
+
117
+ markUnhealthyClient ( clientUsed : KMIPClient , logger : Logger , err : Error ) {
118
+ logger . info ( 'mark kmip host unhealthy' , {
119
+ err,
120
+ msg : err ?. toString ?.( ) ,
121
+ unhealthyHost : clientUsed . host ,
122
+ unhealthy : this . unhealthyClients . length ,
123
+ healthy : this . clients . length ,
124
+ } ) ;
125
+ const index = this . clients . indexOf ( clientUsed ) ;
126
+ if ( index === - 1 ) {
127
+ // should not happen (S3C-9956)
128
+ logger . warn ( 'already marked unhealthy' ) ;
129
+ return ;
130
+ }
131
+ const spliced = this . clients . splice ( index , 1 ) ;
132
+ const unhealthy = {
133
+ client : spliced [ 0 ] ,
134
+ healthcheckTimeout : setTimeout ( ( ) => this . checkUnhealthyClient ( unhealthy ) , UNHEALTHY_DURATION ) ,
135
+ } ;
136
+ this . unhealthyClients . push ( unhealthy ) ;
137
+ // the current index was removed
138
+ // decrease so next roundrobin uses current index again with new client
139
+ this . roundRobinIndex -- ;
140
+ }
141
+
142
+ callback < T extends keyof actions > (
143
+ clientUsed : KMIPClient ,
144
+ funcName : T ,
145
+ args : ArrayPopped < actions [ T ] > ,
146
+ logger : ArrayLast < typeof args > ,
147
+ originalCallback : ArrayLast < actions [ T ] > ,
148
+ ) {
149
+ let retries = 0 ;
150
+ const newCB = ( err : ( ArsenalError | Error | null ) & { code ?: number } , ...rest : any [ ] ) => {
151
+ if ( ! err ) {
152
+ // @ts -expect-error ts2556 typescript won't accept the spread on args array
153
+ return originalCallback ( err , ...rest ) ;
154
+ }
155
+ if ( err . code && err . code >= 400 && err . code <= 499 ) {
156
+ // @ts -expect-error ts2556 typescript won't accept the spread on args array
157
+ return originalCallback ( err , ...rest ) ;
158
+ }
159
+
160
+ if ( retries === this . retries ) {
161
+ logger . warn ( `kmip max retries reached: ${ retries } / ${ this . retries } ` ) ;
162
+ // @ts -expect-error ts2556 typescript won't accept the spread on args array
163
+ return originalCallback ( err , ...rest ) ;
164
+ }
165
+ retries ++ ;
166
+
167
+ this . markUnhealthyClient ( clientUsed , logger , err ) ;
168
+
169
+ if ( this . clients . length === 0 ) {
170
+ logger . warn ( 'kmip cluster has no healthy hosts' ) ;
171
+ // @ts -expect-error ts2556 typescript won't accept the spread on args array
172
+ return originalCallback ( err , ...rest ) ;
173
+ }
174
+
175
+ const nextClient = this . next ( ) ;
176
+ // @ts -expect-error ts2556 typescript won't accept the spread on args array
177
+ nextClient [ funcName ] ( ...args , newCB ) ;
178
+ return undefined ;
179
+ } ;
180
+ return newCB ;
181
+ }
59
182
60
183
createBucketKey ( ...args : Parameters < KMSInterface [ 'createBucketKey' ] > ) {
184
+ const originalCallback = args . pop ( ) as ArrayLast < typeof args > ;
185
+ const poppedArgs = args as unknown as ArrayPopped < typeof args > ;
186
+ const logger = args [ args . length - 1 ] as ArrayLast < typeof poppedArgs > ;
187
+
188
+ if ( this . clients . length === 0 ) {
189
+ logger . warn ( 'kmip cluster has no healthy hosts' ) ;
190
+ return originalCallback ( errorInstances . InternalError . customizeDescription (
191
+ kmipMsg ( 'Create' , args [ 0 ] , `no healthy host in the cluster` ) ) ) ;
192
+ }
193
+
61
194
const client = this . next ( ) ;
62
- client . createBucketKey . apply ( client , args ) ;
195
+
196
+ client . createBucketKey (
197
+ ...poppedArgs ,
198
+ this . callback ( client , 'createBucketKey' , poppedArgs , logger , originalCallback ) ,
199
+ ) ;
200
+ return undefined ;
63
201
}
64
202
65
203
destroyBucketKey ( ...args : Parameters < KMSInterface [ 'destroyBucketKey' ] > ) {
204
+ const originalCallback = args . pop ( ) as ArrayLast < typeof args > ;
205
+ const poppedArgs = args as unknown as ArrayPopped < typeof args > ;
206
+ const logger = args [ args . length - 1 ] as ArrayLast < typeof poppedArgs > ;
207
+
208
+ if ( this . clients . length === 0 ) {
209
+ logger . warn ( 'kmip cluster has no healthy hosts' ) ;
210
+ return originalCallback ( errorInstances . InternalError . customizeDescription (
211
+ kmipMsg ( 'Destroy' , args [ 0 ] , `no healthy host in the cluster` ) ) ) ;
212
+ }
213
+
66
214
const client = this . next ( ) ;
67
- client . destroyBucketKey . apply ( client , args ) ;
215
+
216
+ client . destroyBucketKey (
217
+ ...poppedArgs ,
218
+ this . callback ( client , 'destroyBucketKey' , poppedArgs , logger , originalCallback ) ,
219
+ ) ;
220
+ return undefined ;
68
221
}
69
222
70
223
cipherDataKey ( ...args : Parameters < KMSInterface [ 'cipherDataKey' ] > ) {
224
+ const originalCallback = args . pop ( ) as ArrayLast < typeof args > ;
225
+ const poppedArgs = args as unknown as ArrayPopped < typeof args > ;
226
+ const logger = args [ args . length - 1 ] as ArrayLast < typeof poppedArgs > ;
227
+
228
+ if ( this . clients . length === 0 ) {
229
+ logger . warn ( 'kmip cluster has no healthy hosts' ) ;
230
+ return originalCallback ( errorInstances . InternalError . customizeDescription (
231
+ kmipMsg ( 'Encrypt' , args [ 1 ] , `no healthy host in the cluster` ) ) ) ;
232
+ }
233
+
71
234
const client = this . next ( ) ;
72
- client . cipherDataKey . apply ( client , args ) ;
235
+
236
+ client . cipherDataKey (
237
+ ...poppedArgs ,
238
+ this . callback ( client , 'cipherDataKey' , poppedArgs , logger , originalCallback ) ,
239
+ ) ;
240
+ return undefined ;
73
241
}
74
242
75
243
decipherDataKey ( ...args : Parameters < KMSInterface [ 'decipherDataKey' ] > ) {
244
+ const originalCallback = args . pop ( ) as ArrayLast < typeof args > ;
245
+ const poppedArgs = args as unknown as ArrayPopped < typeof args > ;
246
+ const logger = args [ args . length - 1 ] as ArrayLast < typeof poppedArgs > ;
247
+
248
+ if ( this . clients . length === 0 ) {
249
+ logger . warn ( 'kmip cluster has no healthy hosts' ) ;
250
+ return originalCallback ( errorInstances . InternalError . customizeDescription (
251
+ kmipMsg ( 'Decrypt' , args [ 1 ] , `no healthy host in the cluster` ) ) ) ;
252
+ }
253
+
76
254
const client = this . next ( ) ;
77
- client . decipherDataKey . apply ( client , args ) ;
255
+
256
+ client . decipherDataKey (
257
+ ...poppedArgs ,
258
+ this . callback ( client , 'decipherDataKey' , poppedArgs , logger , originalCallback ) ,
259
+ ) ;
260
+ return undefined ;
78
261
}
79
262
80
263
clusterHealthcheck ( logger : Logger , cb : ( err : Error | null ) => void ) {
0 commit comments