Skip to content
This repository was archived by the owner on Jun 21, 2023. It is now read-only.

Commit 5d9f4f4

Browse files
authored
Bump scalecube-commons version to .17, get to use RetryNonSerializedEmitFailureHandler from commons (#180)
1 parent 67981bf commit 5d9f4f4

File tree

5 files changed

+22
-66
lines changed

5 files changed

+22
-66
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858

5959
<properties>
6060
<scalecube-services.version>2.10.18</scalecube-services.version>
61+
<scalecube-commons.version>1.0.17</scalecube-commons.version>
6162

6263
<reactor.version>2020.0.6</reactor.version>
6364
<rsocket.version>1.0.4</rsocket.version>
@@ -110,6 +111,11 @@
110111
<artifactId>scalecube-services-bytebuf-codec</artifactId>
111112
<version>${scalecube-services.version}</version>
112113
</dependency>
114+
<dependency>
115+
<groupId>io.scalecube</groupId>
116+
<artifactId>scalecube-commons</artifactId>
117+
<version>${scalecube-commons.version}</version>
118+
</dependency>
113119

114120
<!-- Reactor -->
115121
<dependency>

services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/http/HttpGatewayClient.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.scalecube.services.gateway.transport.http;
22

3-
import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;
3+
import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED;
44

55
import io.netty.buffer.ByteBuf;
66
import io.scalecube.services.api.ServiceMessage;
@@ -14,9 +14,7 @@
1414
import org.slf4j.LoggerFactory;
1515
import reactor.core.publisher.Flux;
1616
import reactor.core.publisher.Mono;
17-
import reactor.core.publisher.SignalType;
1817
import reactor.core.publisher.Sinks;
19-
import reactor.core.publisher.Sinks.EmitResult;
2018
import reactor.netty.NettyOutbound;
2119
import reactor.netty.http.client.HttpClient;
2220
import reactor.netty.http.client.HttpClientRequest;
@@ -63,7 +61,7 @@ public HttpGatewayClient(GatewayClientSettings settings, GatewayClientCodec<Byte
6361
close
6462
.asMono()
6563
.then(doClose())
66-
.doFinally(s -> onClose.emitEmpty(EmitFailureHandler.RETRY_NOT_SERIALIZED))
64+
.doFinally(s -> onClose.emitEmpty(RETRY_NON_SERIALIZED))
6765
.doOnTerminate(() -> LOGGER.info("Closed HttpGatewayClient resources"))
6866
.subscribe(null, ex -> LOGGER.warn("Exception occurred on HttpGatewayClient close: " + ex));
6967
}
@@ -104,7 +102,7 @@ public Flux<ServiceMessage> requestChannel(Flux<ServiceMessage> requests) {
104102

105103
@Override
106104
public void close() {
107-
close.emitEmpty(EmitFailureHandler.RETRY_NOT_SERIALIZED);
105+
close.emitEmpty(RETRY_NON_SERIALIZED);
108106
}
109107

110108
@Override
@@ -138,14 +136,4 @@ private ServiceMessage toMessage(HttpClientResponse httpResponse, ByteBuf conten
138136
private boolean isError(int httpCode) {
139137
return httpCode >= 400 && httpCode <= 599;
140138
}
141-
142-
private static class EmitFailureHandler implements Sinks.EmitFailureHandler {
143-
144-
private static final EmitFailureHandler RETRY_NOT_SERIALIZED = new EmitFailureHandler();
145-
146-
@Override
147-
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
148-
return emitResult == FAIL_NON_SERIALIZED;
149-
}
150-
}
151139
}

services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/rsocket/RSocketGatewayClient.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.scalecube.services.gateway.transport.rsocket;
22

3-
import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;
3+
import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED;
44

55
import io.rsocket.Payload;
66
import io.rsocket.RSocket;
@@ -17,9 +17,7 @@
1717
import org.slf4j.LoggerFactory;
1818
import reactor.core.publisher.Flux;
1919
import reactor.core.publisher.Mono;
20-
import reactor.core.publisher.SignalType;
2120
import reactor.core.publisher.Sinks;
22-
import reactor.core.publisher.Sinks.EmitResult;
2321
import reactor.netty.http.client.HttpClient;
2422
import reactor.netty.resources.ConnectionProvider;
2523
import reactor.netty.resources.LoopResources;
@@ -57,7 +55,7 @@ public RSocketGatewayClient(GatewayClientSettings settings, GatewayClientCodec<P
5755
close
5856
.asMono()
5957
.then(doClose())
60-
.doFinally(s -> onClose.emitEmpty(EmitFailureHandler.RETRY_NOT_SERIALIZED))
58+
.doFinally(s -> onClose.emitEmpty(RETRY_NON_SERIALIZED))
6159
.doOnTerminate(() -> LOGGER.info("Closed RSocketGatewayClient resources"))
6260
.subscribe(
6361
null, ex -> LOGGER.warn("Exception occurred on RSocketGatewayClient close: " + ex));
@@ -99,7 +97,7 @@ public Flux<ServiceMessage> requestChannel(Flux<ServiceMessage> requests) {
9997

10098
@Override
10199
public void close() {
102-
close.emitEmpty(EmitFailureHandler.RETRY_NOT_SERIALIZED);
100+
close.emitEmpty(RETRY_NON_SERIALIZED);
103101
}
104102

105103
@Override
@@ -185,14 +183,4 @@ private ServiceMessage toMessage(Payload payload) {
185183
LOGGER.debug("Received response {}", message);
186184
return message;
187185
}
188-
189-
private static class EmitFailureHandler implements Sinks.EmitFailureHandler {
190-
191-
private static final EmitFailureHandler RETRY_NOT_SERIALIZED = new EmitFailureHandler();
192-
193-
@Override
194-
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
195-
return emitResult == FAIL_NON_SERIALIZED;
196-
}
197-
}
198186
}

services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.scalecube.services.gateway.transport.websocket;
22

3-
import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;
3+
import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED;
44

55
import io.netty.buffer.ByteBuf;
66
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
@@ -15,9 +15,7 @@
1515
import org.slf4j.LoggerFactory;
1616
import reactor.core.publisher.Flux;
1717
import reactor.core.publisher.Mono;
18-
import reactor.core.publisher.SignalType;
1918
import reactor.core.publisher.Sinks;
20-
import reactor.core.publisher.Sinks.EmitResult;
2119
import reactor.netty.Connection;
2220
import reactor.netty.http.client.HttpClient;
2321
import reactor.netty.resources.ConnectionProvider;
@@ -78,7 +76,7 @@ public WebsocketGatewayClient(GatewayClientSettings settings, GatewayClientCodec
7876
close
7977
.asMono()
8078
.then(doClose())
81-
.doFinally(s -> onClose.emitEmpty(EmitFailureHandler.RETRY_NOT_SERIALIZED))
79+
.doFinally(s -> onClose.emitEmpty(RETRY_NON_SERIALIZED))
8280
.doOnTerminate(() -> LOGGER.info("Closed client"))
8381
.subscribe(null, ex -> LOGGER.warn("Failed to close client, cause: " + ex));
8482
}
@@ -124,7 +122,7 @@ public Flux<ServiceMessage> requestChannel(Flux<ServiceMessage> requests) {
124122

125123
@Override
126124
public void close() {
127-
close.emitEmpty(EmitFailureHandler.RETRY_NOT_SERIALIZED);
125+
close.emitEmpty(RETRY_NON_SERIALIZED);
128126
}
129127

130128
@Override
@@ -212,14 +210,4 @@ private void onReadIdle(Connection connection) {
212210
private ByteBuf encodeRequest(ServiceMessage message, long sid) {
213211
return codec.encode(ServiceMessage.from(message).header(STREAM_ID, sid).build());
214212
}
215-
216-
private static class EmitFailureHandler implements Sinks.EmitFailureHandler {
217-
218-
private static final EmitFailureHandler RETRY_NOT_SERIALIZED = new EmitFailureHandler();
219-
220-
@Override
221-
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
222-
return emitResult == FAIL_NON_SERIALIZED;
223-
}
224-
}
225213
}

services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.scalecube.services.gateway.transport.websocket;
22

3-
import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;
3+
import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED;
44

55
import io.netty.buffer.ByteBuf;
66
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
@@ -16,9 +16,7 @@
1616
import org.slf4j.Logger;
1717
import org.slf4j.LoggerFactory;
1818
import reactor.core.publisher.Mono;
19-
import reactor.core.publisher.SignalType;
2019
import reactor.core.publisher.Sinks;
21-
import reactor.core.publisher.Sinks.EmitResult;
2220
import reactor.netty.Connection;
2321
import reactor.netty.http.websocket.WebsocketInbound;
2422
import reactor.netty.http.websocket.WebsocketOutbound;
@@ -189,31 +187,29 @@ private void handleResponse(ServiceMessage response, Object processor) {
189187
private static void emitNext(Object processor, ServiceMessage message) {
190188
if (processor instanceof Sinks.One) {
191189
//noinspection unchecked
192-
((Sinks.One<ServiceMessage>) processor)
193-
.emitValue(message, EmitFailureHandler.RETRY_NOT_SERIALIZED);
190+
((Sinks.One<ServiceMessage>) processor).emitValue(message, RETRY_NON_SERIALIZED);
194191
}
195192
if (processor instanceof Sinks.Many) {
196193
//noinspection unchecked
197-
((Sinks.Many<ServiceMessage>) processor)
198-
.emitNext(message, EmitFailureHandler.RETRY_NOT_SERIALIZED);
194+
((Sinks.Many<ServiceMessage>) processor).emitNext(message, RETRY_NON_SERIALIZED);
199195
}
200196
}
201197

202198
private static void emitComplete(Object processor) {
203199
if (processor instanceof Sinks.One) {
204-
((Sinks.One<?>) processor).emitEmpty(EmitFailureHandler.RETRY_NOT_SERIALIZED);
200+
((Sinks.One<?>) processor).emitEmpty(RETRY_NON_SERIALIZED);
205201
}
206202
if (processor instanceof Sinks.Many) {
207-
((Sinks.Many<?>) processor).emitComplete(EmitFailureHandler.RETRY_NOT_SERIALIZED);
203+
((Sinks.Many<?>) processor).emitComplete(RETRY_NON_SERIALIZED);
208204
}
209205
}
210206

211207
private static void emitError(Object processor, Exception e) {
212208
if (processor instanceof Sinks.One) {
213-
((Sinks.One<?>) processor).emitError(e, EmitFailureHandler.RETRY_NOT_SERIALIZED);
209+
((Sinks.One<?>) processor).emitError(e, RETRY_NON_SERIALIZED);
214210
}
215211
if (processor instanceof Sinks.Many) {
216-
((Sinks.Many<?>) processor).emitError(e, EmitFailureHandler.RETRY_NOT_SERIALIZED);
212+
((Sinks.Many<?>) processor).emitError(e, RETRY_NON_SERIALIZED);
217213
}
218214
}
219215

@@ -223,14 +219,4 @@ public String toString() {
223219
.add("id=" + id)
224220
.toString();
225221
}
226-
227-
private static class EmitFailureHandler implements Sinks.EmitFailureHandler {
228-
229-
private static final EmitFailureHandler RETRY_NOT_SERIALIZED = new EmitFailureHandler();
230-
231-
@Override
232-
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
233-
return emitResult == FAIL_NON_SERIALIZED;
234-
}
235-
}
236222
}

0 commit comments

Comments
 (0)