Skip to content
This repository was archived by the owner on Jun 21, 2023. It is now read-only.

Commit 8a2404c

Browse files
committed
Added WebsocketServerTest, updated WebsocketClientTest
1 parent 5d9f4f4 commit 8a2404c

File tree

2 files changed

+115
-2
lines changed

2 files changed

+115
-2
lines changed

services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.junit.jupiter.api.RepeatedTest;
2929
import reactor.core.publisher.Flux;
3030
import reactor.core.publisher.Mono;
31+
import reactor.core.scheduler.Schedulers;
3132
import reactor.test.StepVerifier;
3233

3334
class WebsocketClientTest extends BaseTest {
@@ -97,7 +98,7 @@ void testMessageSequence() {
9798
.transport(new GatewayClientTransport(client))
9899
.router(new StaticAddressRouter(gatewayAddress));
99100

100-
int count = ThreadLocalRandom.current().nextInt(42) + 24;
101+
int count = ThreadLocalRandom.current().nextInt(1042) + 24;
101102

102103
StepVerifier.create(serviceCall.api(TestService.class).many(count) /*.log("<<< ")*/)
103104
.expectNextSequence(IntStream.range(0, count).boxed().collect(Collectors.toList()))
@@ -116,7 +117,9 @@ private static class TestServiceImpl implements TestService {
116117

117118
@Override
118119
public Flux<Integer> many(int count) {
119-
return Flux.range(0, count);
120+
return Flux.range(0, count)
121+
.subscribeOn(Schedulers.boundedElastic())
122+
.publishOn(Schedulers.boundedElastic());
120123
}
121124
}
122125
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package io.scalecube.services.gateway.websocket;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.scalecube.net.Address;
5+
import io.scalecube.services.Microservices;
6+
import io.scalecube.services.ServiceCall;
7+
import io.scalecube.services.annotations.Service;
8+
import io.scalecube.services.annotations.ServiceMethod;
9+
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
10+
import io.scalecube.services.gateway.BaseTest;
11+
import io.scalecube.services.gateway.TestGatewaySessionHandler;
12+
import io.scalecube.services.gateway.transport.GatewayClient;
13+
import io.scalecube.services.gateway.transport.GatewayClientCodec;
14+
import io.scalecube.services.gateway.transport.GatewayClientSettings;
15+
import io.scalecube.services.gateway.transport.GatewayClientTransport;
16+
import io.scalecube.services.gateway.transport.GatewayClientTransports;
17+
import io.scalecube.services.gateway.transport.StaticAddressRouter;
18+
import io.scalecube.services.gateway.transport.websocket.WebsocketGatewayClient;
19+
import io.scalecube.services.gateway.ws.WebsocketGateway;
20+
import io.scalecube.services.transport.rsocket.RSocketServiceTransport;
21+
import java.time.Duration;
22+
import java.util.concurrent.ThreadLocalRandom;
23+
import java.util.stream.Collectors;
24+
import java.util.stream.IntStream;
25+
import org.junit.jupiter.api.AfterAll;
26+
import org.junit.jupiter.api.AfterEach;
27+
import org.junit.jupiter.api.BeforeAll;
28+
import org.junit.jupiter.api.RepeatedTest;
29+
import reactor.core.publisher.Flux;
30+
import reactor.core.publisher.Mono;
31+
import reactor.core.scheduler.Schedulers;
32+
import reactor.test.StepVerifier;
33+
34+
class WebsocketServerTest extends BaseTest {
35+
36+
public static final GatewayClientCodec<ByteBuf> CLIENT_CODEC =
37+
GatewayClientTransports.WEBSOCKET_CLIENT_CODEC;
38+
39+
private static Microservices gateway;
40+
private static Address gatewayAddress;
41+
private static GatewayClient client;
42+
43+
@BeforeAll
44+
static void beforeAll() {
45+
gateway =
46+
Microservices.builder()
47+
.discovery("gateway", ScalecubeServiceDiscovery::new)
48+
.transport(RSocketServiceTransport::new)
49+
.gateway(
50+
options -> new WebsocketGateway(options.id("WS"), new TestGatewaySessionHandler()))
51+
.transport(RSocketServiceTransport::new)
52+
.services(new TestServiceImpl())
53+
.startAwait();
54+
gatewayAddress = gateway.gateway("WS").address();
55+
}
56+
57+
@AfterEach
58+
void afterEach() {
59+
final GatewayClient client = WebsocketServerTest.client;
60+
if (client != null) {
61+
client.close();
62+
}
63+
}
64+
65+
@AfterAll
66+
static void afterAll() {
67+
final GatewayClient client = WebsocketServerTest.client;
68+
if (client != null) {
69+
client.close();
70+
}
71+
Mono.justOrEmpty(gateway).map(Microservices::shutdown).then().block();
72+
}
73+
74+
@RepeatedTest(300)
75+
void testMessageSequence() {
76+
77+
client =
78+
new WebsocketGatewayClient(
79+
GatewayClientSettings.builder().address(gatewayAddress).build(), CLIENT_CODEC);
80+
81+
ServiceCall serviceCall =
82+
new ServiceCall()
83+
.transport(new GatewayClientTransport(client))
84+
.router(new StaticAddressRouter(gatewayAddress));
85+
86+
int count = ThreadLocalRandom.current().nextInt(1042) + 24;
87+
88+
StepVerifier.create(serviceCall.api(TestService.class).many(count) /*.log("<<< ")*/)
89+
.expectNextSequence(IntStream.range(0, count).boxed().collect(Collectors.toList()))
90+
.expectComplete()
91+
.verify(Duration.ofSeconds(10));
92+
}
93+
94+
@Service
95+
public interface TestService {
96+
97+
@ServiceMethod("many")
98+
Flux<Integer> many(int count);
99+
}
100+
101+
private static class TestServiceImpl implements TestService {
102+
103+
@Override
104+
public Flux<Integer> many(int count) {
105+
return Flux.range(0, count)
106+
.subscribeOn(Schedulers.boundedElastic())
107+
.publishOn(Schedulers.boundedElastic());
108+
}
109+
}
110+
}

0 commit comments

Comments
 (0)