@@ -27,6 +27,7 @@ class SocketConnector {
27
27
this .verbose = false ,
28
28
this .logTraffic = false ,
29
29
this .timeout = defaultTimeout,
30
+ this .authTimeout = defaultTimeout,
30
31
IOSink ? logger,
31
32
}) {
32
33
this .logger = logger ?? stderr;
@@ -82,6 +83,9 @@ class SocketConnector {
82
83
83
84
final Completer _closedCompleter = Completer ();
84
85
86
+ /// How long to wait for a client to authenticate its self
87
+ final Duration authTimeout;
88
+
85
89
/// Add a [Side] with optional [SocketAuthVerifier] and
86
90
/// [DataTransformer]
87
91
/// - If [socketAuthVerifier] provided, wait for socket to be authenticated
@@ -101,7 +105,7 @@ class SocketConnector {
101
105
try {
102
106
(authenticated, stream) = await thisSide.socketAuthVerifier!
103
107
(thisSide.socket)
104
- .timeout (Duration (seconds : 5 ) );
108
+ .timeout (authTimeout );
105
109
thisSide.authenticated = authenticated;
106
110
if (thisSide.authenticated) {
107
111
thisSide.stream = stream! ;
@@ -128,6 +132,8 @@ class SocketConnector {
128
132
if (pendingA.isNotEmpty && pendingB.isNotEmpty) {
129
133
Connection c = Connection (pendingA.removeAt (0 ), pendingB.removeAt (0 ));
130
134
connections.add (c);
135
+ _log (chalk.brightBlue (
136
+ 'Added connection. There are now ${connections .length } connections.' ));
131
137
132
138
for (final side in [thisSide, thisSide.farSide! ]) {
133
139
if (side.transformer != null ) {
@@ -165,37 +171,35 @@ class SocketConnector {
165
171
return ;
166
172
}
167
173
side.state = SideState .closing;
174
+ Connection ? connectionToRemove;
175
+ for (final c in connections) {
176
+ if (c.sideA == side || c.sideB == side) {
177
+ _log (chalk.brightBlue ('Will remove established connection' ));
178
+ connectionToRemove = c;
179
+ break ;
180
+ }
181
+ }
182
+ if (connectionToRemove != null ) {
183
+ connections.remove (connectionToRemove);
184
+ _log (chalk
185
+ .brightBlue ('Removed connection. ${connections .length } remaining.' ));
186
+ if (connections.isEmpty && gracePeriodPassed) {
187
+ _log (chalk.brightBlue ('No established connections remain'
188
+ ' and grace period has passed - '
189
+ ' will close connector' ));
190
+ close ();
191
+ }
192
+ }
193
+ side.state = SideState .closed;
168
194
try {
169
195
_log (chalk.brightBlue ('Destroying socket on side ${side .name }' ));
170
196
side.socket.destroy ();
171
197
if (side.farSide != null ) {
172
198
_log (chalk.brightBlue (
173
199
'Destroying socket on far side (${side .farSide ?.name })' ));
174
- side.farSide? .socket.destroy ();
175
- }
176
-
177
- Connection ? connectionToRemove;
178
- for (final c in connections) {
179
- if (c.sideA == side || c.sideB == side) {
180
- _log (chalk.brightBlue ('Will remove established connection' ));
181
- connectionToRemove = c;
182
- break ;
183
- }
200
+ _destroySide (side.farSide! );
184
201
}
185
- if (connectionToRemove != null ) {
186
- connections.remove (connectionToRemove);
187
- _log (chalk.brightBlue ('Removed connection' ));
188
- if (connections.isEmpty && gracePeriodPassed) {
189
- _log (chalk.brightBlue ('No established connections remain'
190
- ' and grace period has passed - '
191
- ' will close connector' ));
192
- close ();
193
- }
194
- }
195
- } catch (_) {
196
- } finally {
197
- side.state = SideState .closed;
198
- }
202
+ } catch (_) {}
199
203
}
200
204
201
205
void close () {
@@ -241,7 +245,9 @@ class SocketConnector {
241
245
SocketAuthVerifier ? socketAuthVerifierA,
242
246
SocketAuthVerifier ? socketAuthVerifierB,
243
247
Duration timeout = SocketConnector .defaultTimeout,
248
+ Duration authTimeout = SocketConnector .defaultTimeout,
244
249
IOSink ? logger,
250
+ int backlog = 0 ,
245
251
}) async {
246
252
IOSink logSink = logger ?? stderr;
247
253
addressA ?? = InternetAddress .anyIPv4;
@@ -251,10 +257,19 @@ class SocketConnector {
251
257
verbose: verbose,
252
258
logTraffic: logTraffic,
253
259
timeout: timeout,
260
+ authTimeout: authTimeout,
254
261
logger: logSink,
255
262
);
256
- connector._serverSocketA = await ServerSocket .bind (addressA, portA);
257
- connector._serverSocketB = await ServerSocket .bind (addressB, portB);
263
+ connector._serverSocketA = await ServerSocket .bind (
264
+ addressA,
265
+ portA,
266
+ backlog: backlog,
267
+ );
268
+ connector._serverSocketB = await ServerSocket .bind (
269
+ addressB,
270
+ portB,
271
+ backlog: backlog,
272
+ );
258
273
if (verbose) {
259
274
logSink.writeln (
260
275
'${DateTime .now ()} | serverToServer | Bound ports A: ${connector .sideAPort }, B: ${connector .sideBPort }' );
@@ -269,7 +284,18 @@ class SocketConnector {
269
284
'${DateTime .now ()} | serverToServer | Connection on serverSocketA: ${connector ._serverSocketA !.port }' );
270
285
}
271
286
Side sideA = Side (socket, true , socketAuthVerifier: socketAuthVerifierA);
272
- unawaited (connector.handleSingleConnection (sideA));
287
+ unawaited (connector.handleSingleConnection (sideA).catchError ((err) {
288
+ logSink
289
+ .writeln ('ERROR $err from handleSingleConnection on sideA $sideA ' );
290
+ }));
291
+ }, onError: (error) {
292
+ logSink.writeln (
293
+ '${DateTime .now ()} | serverToServer | ERROR on serverSocketA: ${connector ._serverSocketA ?.port } : $error ' );
294
+ connector.close ();
295
+ }, onDone: () {
296
+ logSink.writeln (
297
+ '${DateTime .now ()} | serverToServer | onDone called on serverSocketA: ${connector ._serverSocketA ?.port }' );
298
+ connector.close ();
273
299
});
274
300
275
301
// listen for connections to the side 'B' server
@@ -279,7 +305,18 @@ class SocketConnector {
279
305
'${DateTime .now ()} | serverToServer | Connection on serverSocketB: ${connector ._serverSocketB !.port }' );
280
306
}
281
307
Side sideB = Side (socket, false , socketAuthVerifier: socketAuthVerifierB);
282
- unawaited (connector.handleSingleConnection (sideB));
308
+ unawaited (connector.handleSingleConnection (sideB).catchError ((err) {
309
+ logSink
310
+ .writeln ('ERROR $err from handleSingleConnection on sideB $sideB ' );
311
+ }));
312
+ }, onError: (error) {
313
+ logSink.writeln (
314
+ '${DateTime .now ()} | serverToServer | ERROR on serverSocketB: ${connector ._serverSocketB ?.port } : $error ' );
315
+ connector.close ();
316
+ }, onDone: () {
317
+ logSink.writeln (
318
+ '${DateTime .now ()} | serverToServer | onDone called on serverSocketB: ${connector ._serverSocketB ?.port }' );
319
+ connector.close ();
283
320
});
284
321
285
322
return (connector);
@@ -319,15 +356,20 @@ class SocketConnector {
319
356
// Create socket to an address and port
320
357
Socket socket = await Socket .connect (addressA, portA);
321
358
Side sideA = Side (socket, true , transformer: transformAtoB);
322
- unawaited (connector.handleSingleConnection (sideA));
359
+ unawaited (connector.handleSingleConnection (sideA).catchError ((err) {
360
+ logSink.writeln ('ERROR $err from handleSingleConnection on sideA $sideA ' );
361
+ }));
323
362
324
363
// bind to side 'B' port
325
364
connector._serverSocketB = await ServerSocket .bind (addressB, portB);
326
365
327
366
// listen for connections to the 'B' side port
328
367
connector._serverSocketB? .listen ((socketB) {
329
368
Side sideB = Side (socketB, false , transformer: transformBtoA);
330
- unawaited (connector.handleSingleConnection (sideB));
369
+ unawaited (connector.handleSingleConnection (sideB).catchError ((err) {
370
+ logSink
371
+ .writeln ('ERROR $err from handleSingleConnection on sideB $sideB ' );
372
+ }));
331
373
});
332
374
return (connector);
333
375
}
@@ -361,14 +403,18 @@ class SocketConnector {
361
403
}
362
404
Socket sideASocket = await Socket .connect (addressA, portA);
363
405
Side sideA = Side (sideASocket, true , transformer: transformAtoB);
364
- unawaited (connector.handleSingleConnection (sideA));
406
+ unawaited (connector.handleSingleConnection (sideA).catchError ((err) {
407
+ logSink.writeln ('ERROR $err from handleSingleConnection on sideA $sideA ' );
408
+ }));
365
409
366
410
if (verbose) {
367
411
logSink.writeln ('socket_connector: Connecting to $addressB :$portB ' );
368
412
}
369
413
Socket sideBSocket = await Socket .connect (addressB, portB);
370
414
Side sideB = Side (sideBSocket, false , transformer: transformBtoA);
371
- unawaited (connector.handleSingleConnection (sideB));
415
+ unawaited (connector.handleSingleConnection (sideB).catchError ((err) {
416
+ logSink.writeln ('ERROR $err from handleSingleConnection on sideB $sideB ' );
417
+ }));
372
418
373
419
if (verbose) {
374
420
logSink.writeln ('socket_connector: started' );
@@ -408,7 +454,8 @@ class SocketConnector {
408
454
bool multi = false ,
409
455
@Deprecated ("use beforeJoining instead" )
410
456
Function (Socket socketA, Socket socketB)? onConnect,
411
- Function (Side sideA, Side sideB)? beforeJoining}) async {
457
+ Function (Side sideA, Side sideB)? beforeJoining,
458
+ int backlog = 0 }) async {
412
459
IOSink logSink = logger ?? stderr;
413
460
addressA ?? = InternetAddress .anyIPv4;
414
461
@@ -421,27 +468,53 @@ class SocketConnector {
421
468
422
469
int connections = 0 ;
423
470
// bind to a local port for side 'A'
424
- connector._serverSocketA = await ServerSocket .bind (addressA, portA);
425
- // listen on the local port and connect the inbound socket
426
- connector._serverSocketA? .listen ((sideASocket) async {
427
- if (! multi) {
428
- unawaited (connector._serverSocketA? .close ());
429
- }
471
+ connector._serverSocketA = await ServerSocket .bind (
472
+ addressA,
473
+ portA,
474
+ backlog: backlog,
475
+ );
476
+
477
+ StreamController <Socket > ssc = StreamController ();
478
+ ssc.stream.listen ((sideASocket) async {
430
479
Side sideA = Side (sideASocket, true , transformer: transformAtoB);
431
- unawaited (connector.handleSingleConnection (sideA));
480
+ unawaited (connector.handleSingleConnection (sideA).catchError ((err) {
481
+ logSink
482
+ .writeln ('ERROR $err from handleSingleConnection on sideA $sideA ' );
483
+ }));
432
484
433
485
if (verbose) {
434
- logSink.writeln ('Making connection ${++connections } to the "B" side' );
486
+ logSink.writeln ('Creating socket # ${++connections } to the "B" side' );
435
487
}
436
488
// connect to the side 'B' address and port
437
489
Socket sideBSocket = await Socket .connect (addressB, portB);
490
+ if (verbose) {
491
+ logSink.writeln ('"B" side socket #$connections created' );
492
+ }
438
493
Side sideB = Side (sideBSocket, false , transformer: transformBtoA);
439
- beforeJoining? .call (sideA, sideB);
440
- unawaited (connector.handleSingleConnection (sideB));
494
+ if (verbose) {
495
+ logSink.writeln ('Calling the beforeJoining callback' );
496
+ }
497
+ await beforeJoining? .call (sideA, sideB);
498
+ unawaited (connector.handleSingleConnection (sideB).catchError ((err) {
499
+ logSink
500
+ .writeln ('ERROR $err from handleSingleConnection on sideB $sideB ' );
501
+ }));
441
502
442
503
onConnect? .call (sideASocket, sideBSocket);
443
504
});
444
505
506
+ // listen on the local port and connect the inbound socket
507
+ connector._serverSocketA? .listen ((sideASocket) {
508
+ if (! multi) {
509
+ try {
510
+ connector._serverSocketA? .close ();
511
+ } catch (e) {
512
+ logSink.writeln ('Error while closing serverSocketA: $e ' );
513
+ }
514
+ }
515
+ ssc.add (sideASocket);
516
+ });
517
+
445
518
return (connector);
446
519
}
447
520
}
0 commit comments