Skip to content

Commit e9c435a

Browse files
authored
Merge pull request #24 from atsign-foundation/fix-2.3.2
fix: stability under load
2 parents afa33e0 + df601ce commit e9c435a

File tree

3 files changed

+28
-9
lines changed

3 files changed

+28
-9
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## 2.3.2
2+
3+
- fix: stability under load
4+
- ensure that socket.done is handled in all cases
5+
- check a side's state before attempting to write to that side's socket
6+
17
## 2.3.1
28

39
- fix: correctly handle situation where a socket has been closed but the other

lib/src/socket_connector.dart

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -136,23 +136,31 @@ class SocketConnector {
136136
'Added connection. There are now ${connections.length} connections.'));
137137

138138
for (final side in [thisSide, thisSide.farSide!]) {
139+
unawaited(side.socket.done
140+
.then((v) => _destroySide(side))
141+
.catchError((err) => _destroySide(side)));
139142
if (side.transformer != null) {
140143
// transformer is there to transform data originating FROM its side
141144
StreamController<Uint8List> sc = StreamController<Uint8List>();
142145
side.farSide!.sink = sc;
143146
Stream<List<int>> transformed = side.transformer!(sc.stream);
144-
transformed.listen((event) async {
147+
transformed.listen((data) {
145148
try {
146-
side.farSide!.socket.add(event);
147-
await side.farSide!.socket.flush();
148-
} catch (e) {
149+
if (side.farSide!.state == SideState.open) {
150+
side.farSide!.socket.add(data);
151+
} else {
152+
throw StateError(
153+
'Will not write to side ${side.farSide!.name} as its state is ${side.farSide!.state}');
154+
}
155+
} catch (e, st) {
149156
_log('Failed to write to side ${side.farSide!.name} - closing',
150157
force: true);
158+
_log('(Error was $e; Stack trace follows\n$st');
151159
_destroySide(side.farSide!);
152160
}
153161
});
154162
}
155-
side.stream.listen((Uint8List data) async {
163+
side.stream.listen((Uint8List data) {
156164
if (logTraffic) {
157165
final message = String.fromCharCodes(data);
158166
if (side.isSideA) {
@@ -164,11 +172,16 @@ class SocketConnector {
164172
}
165173
}
166174
try {
167-
side.farSide!.sink.add(data);
168-
await side.farSide!.socket.flush();
169-
} catch (e) {
175+
if (side.farSide!.state == SideState.open) {
176+
side.farSide!.sink.add(data);
177+
} else {
178+
throw StateError(
179+
'Will not write to side ${side.farSide!.name} as its state is ${side.farSide!.state}');
180+
}
181+
} catch (e, st) {
170182
_log('Failed to write to side ${side.farSide!.name} - closing',
171183
force: true);
184+
_log('(Error was $e; Stack trace follows\n$st');
172185
_destroySide(side.farSide!);
173186
}
174187
}, onDone: () {

pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name: socket_connector
22
description: Package for joining sockets together to create socket relays.
33

4-
version: 2.3.1
4+
version: 2.3.2
55
repository: https://github.com/cconstab/socket_connector
66

77
environment:

0 commit comments

Comments
 (0)