Skip to content

Commit 1453018

Browse files
authored
Merge pull request #25 from atsign-foundation/fix-2.3.3
fix: improve stability under load
2 parents e9c435a + f7e5cec commit 1453018

File tree

4 files changed

+108
-64
lines changed

4 files changed

+108
-64
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
## 2.3.3
2+
3+
- fix: ensure serverToSocket handles sockets in strict sequence as they are
4+
accepted
5+
- fix: ensure that, when one side is closed, all data received from that side
6+
has been delivered to the other side before closing the other side
7+
18
## 2.3.2
29

310
- fix: stability under load

lib/src/socket_connector.dart

Lines changed: 93 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import 'dart:async';
22
import 'dart:io';
33
import 'dart:typed_data';
44
import 'package:chalkdart/chalk.dart';
5+
import 'package:mutex/mutex.dart';
56
import 'package:socket_connector/src/types.dart';
67

78
/// Typical usage is via the [serverToServer], [serverToSocket],
@@ -97,6 +98,9 @@ class SocketConnector {
9798
if (closed) {
9899
throw StateError('Connector is closed');
99100
}
101+
unawaited(thisSide.socket.done
102+
.then((v) => _closeSide(thisSide))
103+
.catchError((err) => _closeSide(thisSide)));
100104
if (thisSide.socketAuthVerifier == null) {
101105
thisSide.authenticated = true;
102106
} else {
@@ -119,7 +123,7 @@ class SocketConnector {
119123
}
120124
if (!thisSide.authenticated) {
121125
_log('Authentication failed on side ${thisSide.name}', force: true);
122-
_destroySide(thisSide);
126+
_closeSide(thisSide);
123127
return;
124128
}
125129

@@ -136,31 +140,34 @@ class SocketConnector {
136140
'Added connection. There are now ${connections.length} connections.'));
137141

138142
for (final side in [thisSide, thisSide.farSide!]) {
139-
unawaited(side.socket.done
140-
.then((v) => _destroySide(side))
141-
.catchError((err) => _destroySide(side)));
142143
if (side.transformer != null) {
143144
// transformer is there to transform data originating FROM its side
145+
// transformer's output will write to the SOCKET on the far side
144146
StreamController<Uint8List> sc = StreamController<Uint8List>();
145147
side.farSide!.sink = sc;
146148
Stream<List<int>> transformed = side.transformer!(sc.stream);
147-
transformed.listen((data) {
148-
try {
149-
if (side.farSide!.state == SideState.open) {
149+
transformed.listen(
150+
(data) {
151+
try {
150152
side.farSide!.socket.add(data);
151-
} else {
152-
throw StateError(
153-
'Will not write to side ${side.farSide!.name} as its state is ${side.farSide!.state}');
153+
side.farSide!.sent += data.length;
154+
if (side.state == SideState.closed &&
155+
side.rcvd == side.farSide!.sent) {
156+
_closeSide(side.farSide!);
157+
}
158+
} catch (e, st) {
159+
_log('Failed to write to side ${side.farSide!.name} - closing',
160+
force: true);
161+
_log('(Error was $e; Stack trace follows\n$st', force: true);
162+
_closeSide(side.farSide!);
154163
}
155-
} catch (e, st) {
156-
_log('Failed to write to side ${side.farSide!.name} - closing',
157-
force: true);
158-
_log('(Error was $e; Stack trace follows\n$st');
159-
_destroySide(side.farSide!);
160-
}
161-
});
164+
},
165+
onDone: () => _closeSide(side),
166+
onError: (error) => _closeSide(side),
167+
);
162168
}
163169
side.stream.listen((Uint8List data) {
170+
side.rcvd += data.length;
164171
if (logTraffic) {
165172
final message = String.fromCharCodes(data);
166173
if (side.isSideA) {
@@ -172,34 +179,41 @@ class SocketConnector {
172179
}
173180
}
174181
try {
175-
if (side.farSide!.state == SideState.open) {
176-
side.farSide!.sink.add(data);
177-
} else {
178-
throw StateError(
179-
'Will not write to side ${side.farSide!.name} as its state is ${side.farSide!.state}');
182+
side.farSide!.sink.add(data);
183+
if (side.farSide!.sink is Socket) {
184+
side.farSide!.sent += data.length;
185+
if (side.state == SideState.closed &&
186+
side.rcvd == side.farSide!.sent) {
187+
_closeSide(side.farSide!);
188+
}
180189
}
181190
} catch (e, st) {
182191
_log('Failed to write to side ${side.farSide!.name} - closing',
183192
force: true);
184-
_log('(Error was $e; Stack trace follows\n$st');
185-
_destroySide(side.farSide!);
193+
_log('(Error was $e; Stack trace follows\n$st', force: true);
194+
_closeSide(side.farSide!);
186195
}
187-
}, onDone: () {
188-
_log('stream.onDone on side ${side.name}');
189-
_destroySide(side);
196+
}, onDone: () async {
197+
_log('${side.stream.runtimeType}.onDone on side ${side.name}');
198+
_closeSide(side);
190199
}, onError: (error) {
191-
_log('stream.onError on side ${side.name}: $error', force: true);
192-
_destroySide(side);
200+
_log(
201+
'${side.stream.runtimeType}.onError on side ${side.name}: $error',
202+
force: true);
203+
_closeSide(side);
193204
});
194205
}
195206
}
196207
}
197208

198-
_destroySide(final Side side) {
209+
_closeSide(final Side side) async {
199210
if (side.state != SideState.open) {
200211
return;
201212
}
202-
side.state = SideState.closing;
213+
side.state = SideState.closed;
214+
215+
_log(chalk.brightBlue('_closeSide ${side.name}: RCVD: ${side.rcvd} bytes; SENT: ${side.sent} bytes'));
216+
203217
Connection? connectionToRemove;
204218
for (final c in connections) {
205219
if (c.sideA == side || c.sideB == side) {
@@ -219,16 +233,24 @@ class SocketConnector {
219233
close();
220234
}
221235
}
222-
side.state = SideState.closed;
236+
223237
try {
224238
_log(chalk.brightBlue('Destroying socket on side ${side.name}'));
239+
await side.socket.flush();
225240
side.socket.destroy();
226-
if (side.farSide != null) {
227-
_log(chalk.brightBlue(
228-
'Destroying socket on far side (${side.farSide?.name})'));
229-
_destroySide(side.farSide!);
241+
if (side.farSide != null && side.farSide!.state != SideState.closed) {
242+
if (side.rcvd == side.farSide!.sent) {
243+
_log(chalk.brightBlue(
244+
'Far side (${side.farSide?.name}) has received all data - will close it'));
245+
_closeSide(side.farSide!);
246+
} else {
247+
_log(chalk.brightBlue(
248+
'Far side (${side.farSide?.name}) has NOT YET received all data'));
249+
}
230250
}
231-
} catch (_) {}
251+
} catch (err) {
252+
_log('_closeSide encountered error $err');
253+
}
232254
}
233255

234256
void close() {
@@ -243,11 +265,11 @@ class SocketConnector {
243265
_log('closed');
244266
}
245267
for (final s in pendingA) {
246-
_destroySide(s);
268+
_closeSide(s);
247269
}
248270
pendingA.clear();
249271
for (final s in pendingB) {
250-
_destroySide(s);
272+
_closeSide(s);
251273
}
252274
pendingB.clear();
253275
}
@@ -504,32 +526,40 @@ class SocketConnector {
504526
);
505527

506528
StreamController<Socket> ssc = StreamController();
529+
Mutex m = Mutex();
507530
ssc.stream.listen((sideASocket) async {
508-
Side sideA = Side(sideASocket, true, transformer: transformAtoB);
509-
unawaited(connector.handleSingleConnection(sideA).catchError((err) {
510-
logSink
511-
.writeln('ERROR $err from handleSingleConnection on sideA $sideA');
512-
}));
513-
514-
if (verbose) {
515-
logSink.writeln('Creating socket #${++connections} to the "B" side');
516-
}
517-
// connect to the side 'B' address and port
518-
Socket sideBSocket = await Socket.connect(addressB, portB);
519-
if (verbose) {
520-
logSink.writeln('"B" side socket #$connections created');
521-
}
522-
Side sideB = Side(sideBSocket, false, transformer: transformBtoA);
523-
if (verbose) {
524-
logSink.writeln('Calling the beforeJoining callback');
531+
try {
532+
// It's important we handle these in sequence with no chance for race
533+
// So we're going to use a mutex
534+
await m.acquire();
535+
Side sideA = Side(sideASocket, true, transformer: transformAtoB);
536+
unawaited(connector.handleSingleConnection(sideA).catchError((err) {
537+
logSink.writeln(
538+
'ERROR $err from handleSingleConnection on sideA $sideA');
539+
}));
540+
541+
if (verbose) {
542+
logSink.writeln('Creating socket #${++connections} to the "B" side');
543+
}
544+
// connect to the side 'B' address and port
545+
Socket sideBSocket = await Socket.connect(addressB, portB);
546+
if (verbose) {
547+
logSink.writeln('"B" side socket #$connections created');
548+
}
549+
Side sideB = Side(sideBSocket, false, transformer: transformBtoA);
550+
if (verbose) {
551+
logSink.writeln('Calling the beforeJoining callback');
552+
}
553+
await beforeJoining?.call(sideA, sideB);
554+
unawaited(connector.handleSingleConnection(sideB).catchError((err) {
555+
logSink.writeln(
556+
'ERROR $err from handleSingleConnection on sideB $sideB');
557+
}));
558+
559+
onConnect?.call(sideASocket, sideBSocket);
560+
} finally {
561+
m.release();
525562
}
526-
await beforeJoining?.call(sideA, sideB);
527-
unawaited(connector.handleSingleConnection(sideB).catchError((err) {
528-
logSink
529-
.writeln('ERROR $err from handleSingleConnection on sideB $sideB');
530-
}));
531-
532-
onConnect?.call(sideASocket, sideBSocket);
533563
});
534564

535565
// listen on the local port and connect the inbound socket

lib/src/types.dart

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ class Side {
6565
SocketAuthVerifier? socketAuthVerifier;
6666
DataTransformer? transformer;
6767

68+
/// number of bytes written to this side's socket
69+
int sent = 0;
70+
71+
/// number of bytes received from this side's socket
72+
int rcvd = 0;
73+
6874
String get name => isSideA ? 'A' : 'B';
6975
Side(this.socket, this.isSideA, {this.socketAuthVerifier, this.transformer}) {
7076
sink = socket;

pubspec.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
name: socket_connector
22
description: Package for joining sockets together to create socket relays.
33

4-
version: 2.3.2
4+
version: 2.3.3
55
repository: https://github.com/cconstab/socket_connector
66

77
environment:
88
sdk: '>=3.0.0 <4.0.0'
99

1010
dependencies:
1111
chalkdart: ^2.2.1
12+
mutex: ^3.1.0
1213

1314
dev_dependencies:
1415
lints: ^3.0.0

0 commit comments

Comments
 (0)