Skip to content
This repository was archived by the owner on Jun 21, 2023. It is now read-only.

Commit 6c1703e

Browse files
authored
Merge pull request #176 from scalecube/verify-ordering-for-ws-client
Add test to verify messages sequence in WS client
2 parents 0a53844 + d10b412 commit 6c1703e

File tree

1 file changed

+123
-0
lines changed

1 file changed

+123
-0
lines changed
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package io.scalecube.services.gateway.websocket;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.scalecube.net.Address;
5+
import io.scalecube.services.Microservices;
6+
import io.scalecube.services.ServiceCall;
7+
import io.scalecube.services.annotations.Service;
8+
import io.scalecube.services.annotations.ServiceMethod;
9+
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
10+
import io.scalecube.services.gateway.BaseTest;
11+
import io.scalecube.services.gateway.TestGatewaySessionHandler;
12+
import io.scalecube.services.gateway.transport.GatewayClient;
13+
import io.scalecube.services.gateway.transport.GatewayClientCodec;
14+
import io.scalecube.services.gateway.transport.GatewayClientSettings;
15+
import io.scalecube.services.gateway.transport.GatewayClientTransport;
16+
import io.scalecube.services.gateway.transport.GatewayClientTransports;
17+
import io.scalecube.services.gateway.transport.StaticAddressRouter;
18+
import io.scalecube.services.gateway.transport.websocket.WebsocketGatewayClient;
19+
import io.scalecube.services.gateway.ws.WebsocketGateway;
20+
import io.scalecube.services.transport.rsocket.RSocketServiceTransport;
21+
import java.time.Duration;
22+
import java.util.concurrent.ThreadLocalRandom;
23+
import java.util.stream.Collectors;
24+
import java.util.stream.IntStream;
25+
import org.junit.jupiter.api.AfterAll;
26+
import org.junit.jupiter.api.AfterEach;
27+
import org.junit.jupiter.api.BeforeAll;
28+
import org.junit.jupiter.api.RepeatedTest;
29+
import reactor.core.publisher.Flux;
30+
import reactor.core.publisher.Mono;
31+
import reactor.test.StepVerifier;
32+
33+
class WebsocketClientTest extends BaseTest {
34+
35+
public static final GatewayClientCodec<ByteBuf> CLIENT_CODEC =
36+
GatewayClientTransports.WEBSOCKET_CLIENT_CODEC;
37+
38+
private static Microservices gateway;
39+
private static Address gatewayAddress;
40+
private static Microservices service;
41+
private static GatewayClient client;
42+
private static ServiceCall serviceCall;
43+
44+
@BeforeAll
45+
static void beforeAll() {
46+
gateway =
47+
Microservices.builder()
48+
.discovery("gateway", ScalecubeServiceDiscovery::new)
49+
.transport(RSocketServiceTransport::new)
50+
.gateway(
51+
options -> new WebsocketGateway(options.id("WS"), new TestGatewaySessionHandler()))
52+
.startAwait();
53+
gatewayAddress = gateway.gateway("WS").address();
54+
55+
service =
56+
Microservices.builder()
57+
.discovery(
58+
"service",
59+
serviceEndpoint ->
60+
new ScalecubeServiceDiscovery(serviceEndpoint)
61+
.membership(
62+
opts -> opts.seedMembers(gateway.discovery("gateway").address())))
63+
.transport(RSocketServiceTransport::new)
64+
.services(new TestServiceImpl())
65+
.startAwait();
66+
}
67+
68+
@AfterEach
69+
void afterEach() {
70+
final GatewayClient client = WebsocketClientTest.client;
71+
if (client != null) {
72+
client.close();
73+
}
74+
}
75+
76+
@AfterAll
77+
static void afterAll() {
78+
final GatewayClient client = WebsocketClientTest.client;
79+
if (client != null) {
80+
client.close();
81+
}
82+
Flux.concat(
83+
Mono.justOrEmpty(gateway).map(Microservices::shutdown),
84+
Mono.justOrEmpty(service).map(Microservices::shutdown))
85+
.then()
86+
.block();
87+
}
88+
89+
@RepeatedTest(300)
90+
void testCloseServiceStreamAfterLostConnection() {
91+
92+
client =
93+
new WebsocketGatewayClient(
94+
GatewayClientSettings.builder().address(gatewayAddress).build(), CLIENT_CODEC);
95+
96+
serviceCall =
97+
new ServiceCall()
98+
.transport(new GatewayClientTransport(client))
99+
.router(new StaticAddressRouter(gatewayAddress));
100+
101+
int count = ThreadLocalRandom.current().nextInt(42) + 24;
102+
103+
StepVerifier.create(serviceCall.api(TestService.class).many(count) /*.log("<<< ")*/)
104+
.expectNextSequence(IntStream.range(0, count).boxed().collect(Collectors.toList()))
105+
.expectComplete()
106+
.verify(Duration.ofSeconds(10));
107+
}
108+
109+
@Service
110+
public interface TestService {
111+
112+
@ServiceMethod("many")
113+
Flux<Integer> many(int count);
114+
}
115+
116+
private static class TestServiceImpl implements TestService {
117+
118+
@Override
119+
public Flux<Integer> many(int count) {
120+
return Flux.range(0, count);
121+
}
122+
}
123+
}

0 commit comments

Comments
 (0)