Skip to content
This repository was archived by the owner on Jun 21, 2023. It is now read-only.

Commit 8222276

Browse files
authored
Issue#376 (#182)
* Refactored .send() on websocket session
1 parent 7c33679 commit 8222276

File tree

7 files changed

+364
-34
lines changed

7 files changed

+364
-34
lines changed

services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -188,28 +188,28 @@ private void onRequest(WebsocketGatewaySession session, ServiceMessage request,
188188
final Flux<ServiceMessage> serviceStream = serviceCall.requestMany(request);
189189

190190
Disposable disposable =
191-
Optional.ofNullable(request.header(RATE_LIMIT_FIELD))
192-
.map(Integer::valueOf)
193-
.map(serviceStream::limitRate)
194-
.orElse(serviceStream)
195-
.map(
196-
response -> {
197-
boolean isErrorResponse = false;
198-
if (response.isError()) {
199-
receivedError.set(true);
200-
isErrorResponse = true;
201-
}
202-
return newResponseMessage(sid, response, isErrorResponse);
203-
})
204-
.flatMap(session::send)
191+
session
192+
.send(
193+
Optional.ofNullable(request.header(RATE_LIMIT_FIELD))
194+
.map(Integer::valueOf)
195+
.map(serviceStream::limitRate)
196+
.orElse(serviceStream)
197+
.map(
198+
response -> {
199+
boolean isErrorResponse = response.isError();
200+
if (isErrorResponse) {
201+
receivedError.set(true);
202+
}
203+
return newResponseMessage(sid, response, isErrorResponse);
204+
}))
205205
.doOnError(th -> ReferenceCountUtil.safestRelease(request.data()))
206206
.doOnError(
207207
th ->
208208
session
209209
.send(toErrorResponse(errorMapper, request, th))
210210
.contextWrite(context)
211211
.subscribe())
212-
.doOnComplete(
212+
.doOnTerminate(
213213
() -> {
214214
if (!receivedError.get()) {
215215
session

services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,18 +86,38 @@ public Flux<ByteBuf> receive() {
8686
* @return mono void
8787
*/
8888
public Mono<Void> send(ServiceMessage response) {
89+
return Mono.deferContextual(
90+
context -> {
91+
final TextWebSocketFrame frame = new TextWebSocketFrame(codec.encode(response));
92+
gatewayHandler.onResponse(this, frame.content(), response, (Context) context);
93+
// send with publisher (defer buffer cleanup to netty)
94+
return outbound
95+
.sendObject(frame)
96+
.then()
97+
.doOnError(th -> gatewayHandler.onError(this, th, (Context) context));
98+
});
99+
}
100+
101+
/**
102+
* Method to send normal response.
103+
*
104+
* @param messages messages
105+
* @return mono void
106+
*/
107+
public Mono<Void> send(Flux<ServiceMessage> messages) {
89108
return Mono.deferContextual(
90109
context -> {
91110
// send with publisher (defer buffer cleanup to netty)
92111
return outbound
93112
.sendObject(
94-
Mono.just(response)
95-
.map(codec::encode)
96-
.map(TextWebSocketFrame::new)
97-
.doOnNext(
98-
frame ->
99-
gatewayHandler.onResponse(
100-
this, frame.content(), response, (Context) context)),
113+
messages.map(
114+
response -> {
115+
final TextWebSocketFrame frame =
116+
new TextWebSocketFrame(codec.encode(response));
117+
gatewayHandler.onResponse(
118+
this, frame.content(), response, (Context) context);
119+
return frame;
120+
}),
101121
f -> true)
102122
.then()
103123
.doOnError(th -> gatewayHandler.onError(this, th, (Context) context));
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package io.scalecube.services.gateway.websocket;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import reactor.core.CoreSubscriber;
6+
7+
public class CancelledSubscriber implements CoreSubscriber {
8+
9+
private static final Logger LOGGER = LoggerFactory.getLogger(CancelledSubscriber.class);
10+
11+
public static final CancelledSubscriber INSTANCE = new CancelledSubscriber();
12+
13+
private CancelledSubscriber() {
14+
// Do not instantiate
15+
}
16+
17+
@Override
18+
public void onSubscribe(org.reactivestreams.Subscription s) {
19+
// no-op
20+
}
21+
22+
@Override
23+
public void onNext(Object o) {
24+
LOGGER.warn("Received ({}) which will be dropped immediately due cancelled aeron inbound", o);
25+
}
26+
27+
@Override
28+
public void onError(Throwable t) {
29+
// no-op
30+
}
31+
32+
@Override
33+
public void onComplete() {
34+
// no-op
35+
}
36+
}
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package io.scalecube.services.gateway.websocket;
2+
3+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
4+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
5+
import org.reactivestreams.Subscription;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import reactor.core.CoreSubscriber;
9+
import reactor.core.Exceptions;
10+
import reactor.core.publisher.BaseSubscriber;
11+
import reactor.core.publisher.Flux;
12+
import reactor.core.publisher.Operators;
13+
14+
public final class ReactiveAdapter extends BaseSubscriber<Object> implements ReactiveOperator {
15+
16+
private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveAdapter.class);
17+
18+
private static final AtomicLongFieldUpdater<ReactiveAdapter> REQUESTED =
19+
AtomicLongFieldUpdater.newUpdater(ReactiveAdapter.class, "requested");
20+
21+
@SuppressWarnings("rawtypes")
22+
private static final AtomicReferenceFieldUpdater<ReactiveAdapter, CoreSubscriber>
23+
DESTINATION_SUBSCRIBER =
24+
AtomicReferenceFieldUpdater.newUpdater(
25+
ReactiveAdapter.class, CoreSubscriber.class, "destinationSubscriber");
26+
27+
private final FluxReceive inbound = new FluxReceive();
28+
29+
private volatile long requested;
30+
private volatile boolean fastPath;
31+
private long produced;
32+
private volatile CoreSubscriber<? super Object> destinationSubscriber;
33+
private Throwable lastError;
34+
35+
@Override
36+
public boolean isDisposed() {
37+
return destinationSubscriber == CancelledSubscriber.INSTANCE;
38+
}
39+
40+
@Override
41+
public void dispose(Throwable throwable) {
42+
Subscription upstream = upstream();
43+
if (upstream != null) {
44+
upstream.cancel();
45+
}
46+
CoreSubscriber<?> destination =
47+
DESTINATION_SUBSCRIBER.getAndSet(this, CancelledSubscriber.INSTANCE);
48+
if (destination != null) {
49+
destination.onError(throwable);
50+
}
51+
}
52+
53+
@Override
54+
public void dispose() {
55+
inbound.cancel();
56+
}
57+
58+
public Flux<Object> receive() {
59+
return inbound;
60+
}
61+
62+
@Override
63+
public void lastError(Throwable throwable) {
64+
lastError = throwable;
65+
}
66+
67+
@Override
68+
public Throwable lastError() {
69+
return lastError;
70+
}
71+
72+
@Override
73+
public void tryNext(Object Object) {
74+
if (!isDisposed()) {
75+
destinationSubscriber.onNext(Object);
76+
} else {
77+
LOGGER.warn("[tryNext] reactiveAdapter is disposed, dropping : " + Object);
78+
}
79+
}
80+
81+
@Override
82+
public boolean isFastPath() {
83+
return fastPath;
84+
}
85+
86+
@Override
87+
public void commitProduced() {
88+
if (produced > 0) {
89+
Operators.produced(REQUESTED, this, produced);
90+
produced = 0;
91+
}
92+
}
93+
94+
@Override
95+
public long incrementProduced() {
96+
return ++produced;
97+
}
98+
99+
@Override
100+
public long requested(long limit) {
101+
return Math.min(requested, limit);
102+
}
103+
104+
@Override
105+
protected void hookOnSubscribe(Subscription subscription) {
106+
subscription.request(requested);
107+
}
108+
109+
@Override
110+
protected void hookOnNext(Object Object) {
111+
tryNext(Object);
112+
}
113+
114+
@Override
115+
protected void hookOnComplete() {
116+
dispose();
117+
}
118+
119+
@Override
120+
protected void hookOnError(Throwable throwable) {
121+
dispose(throwable);
122+
}
123+
124+
@Override
125+
protected void hookOnCancel() {
126+
dispose();
127+
}
128+
129+
class FluxReceive extends Flux<Object> implements Subscription {
130+
131+
@Override
132+
public void request(long n) {
133+
Subscription upstream = upstream();
134+
if (upstream != null) {
135+
upstream.request(n);
136+
}
137+
if (fastPath) {
138+
return;
139+
}
140+
if (n == Long.MAX_VALUE) {
141+
fastPath = true;
142+
requested = Long.MAX_VALUE;
143+
return;
144+
}
145+
Operators.addCap(REQUESTED, ReactiveAdapter.this, n);
146+
}
147+
148+
@Override
149+
public void cancel() {
150+
Subscription upstream = upstream();
151+
if (upstream != null) {
152+
upstream.cancel();
153+
}
154+
CoreSubscriber<?> destination =
155+
DESTINATION_SUBSCRIBER.getAndSet(ReactiveAdapter.this, CancelledSubscriber.INSTANCE);
156+
if (destination != null) {
157+
destination.onComplete();
158+
}
159+
}
160+
161+
@Override
162+
public void subscribe(CoreSubscriber<? super Object> destinationSubscriber) {
163+
boolean result =
164+
DESTINATION_SUBSCRIBER.compareAndSet(ReactiveAdapter.this, null, destinationSubscriber);
165+
if (result) {
166+
destinationSubscriber.onSubscribe(this);
167+
} else {
168+
Operators.error(
169+
destinationSubscriber,
170+
isDisposed()
171+
? Exceptions.failWithCancel()
172+
: Exceptions.duplicateOnSubscribeException());
173+
}
174+
}
175+
}
176+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.scalecube.services.gateway.websocket;
2+
3+
import reactor.core.Disposable;
4+
5+
public interface ReactiveOperator extends Disposable {
6+
7+
void dispose(Throwable throwable);
8+
9+
void lastError(Throwable throwable);
10+
11+
Throwable lastError();
12+
13+
void tryNext(Object fragment);
14+
15+
boolean isFastPath();
16+
17+
void commitProduced();
18+
19+
long incrementProduced();
20+
21+
long requested(long limit);
22+
}

0 commit comments

Comments
 (0)