15
15
*/
16
16
package io .fabric8 .kubernetes .client .http ;
17
17
18
- import com .sun .net .httpserver .HttpExchange ;
19
- import com .sun .net .httpserver .HttpHandler ;
20
- import com .sun .net .httpserver .HttpServer ;
18
+ import io .fabric8 .kubernetes .client .RequestConfigBuilder ;
21
19
import io .fabric8 .mockwebserver .MockWebServer ;
22
20
import io .fabric8 .mockwebserver .MockWebServerListener ;
23
21
import io .fabric8 .mockwebserver .http .MockResponse ;
24
22
import io .fabric8 .mockwebserver .http .RecordedHttpConnection ;
25
23
import io .fabric8 .mockwebserver .http .Response ;
26
24
import io .fabric8 .mockwebserver .http .WebSocketListener ;
27
25
import io .fabric8 .mockwebserver .vertx .Protocol ;
26
+ import io .vertx .core .Vertx ;
27
+ import io .vertx .core .http .HttpServer ;
28
+ import io .vertx .core .http .HttpServerOptions ;
29
+ import io .vertx .core .http .HttpServerRequest ;
30
+ import io .vertx .core .http .HttpVersion ;
31
+ import io .vertx .core .net .NetServerOptions ;
32
+ import org .awaitility .Awaitility ;
28
33
import org .junit .jupiter .api .AfterEach ;
29
34
import org .junit .jupiter .api .BeforeEach ;
30
35
import org .junit .jupiter .api .DisplayName ;
31
36
import org .junit .jupiter .api .Test ;
32
37
import org .junit .jupiter .api .condition .DisabledOnOs ;
33
38
import org .junit .jupiter .api .condition .OS ;
34
39
35
- import java .io .IOException ;
36
- import java .net .InetSocketAddress ;
37
40
import java .net .URI ;
38
41
import java .util .Collection ;
39
42
import java .util .Collections ;
42
45
import java .util .concurrent .ConcurrentHashMap ;
43
46
import java .util .concurrent .CountDownLatch ;
44
47
import java .util .concurrent .CyclicBarrier ;
45
- import java .util .concurrent .ExecutorService ;
46
- import java .util .concurrent .Executors ;
47
48
import java .util .concurrent .TimeUnit ;
48
49
import java .util .stream .IntStream ;
49
50
@@ -59,29 +60,24 @@ public abstract class AbstractSimultaneousConnectionsTest {
59
60
60
61
private RegisteredConnections registeredConnections ;
61
62
private MockWebServer mockWebServer ;
62
- private ExecutorService httpExecutor ;
63
- private HttpServer httpServer ;
63
+ private Vertx vertx ;
64
64
65
65
private HttpClient .Builder clientBuilder ;
66
66
67
67
@ BeforeEach
68
- void prepareServerAndBuilder () throws IOException {
68
+ void prepareServerAndBuilder () {
69
69
registeredConnections = new RegisteredConnections ();
70
70
mockWebServer = new MockWebServer ();
71
71
mockWebServer .addListener (registeredConnections );
72
- httpExecutor = Executors .newCachedThreadPool ();
73
- httpServer = HttpServer .create (new InetSocketAddress (0 ), 0 );
74
- httpServer .setExecutor (httpExecutor );
75
- httpServer .start ();
72
+ vertx = Vertx .vertx ();
76
73
clientBuilder = getHttpClientFactory ().newBuilder ()
77
74
.connectTimeout (60 , TimeUnit .SECONDS );
78
75
}
79
76
80
77
@ AfterEach
81
78
void stopServer () {
82
79
mockWebServer .shutdown ();
83
- httpServer .stop (0 );
84
- httpExecutor .shutdownNow ();
80
+ vertx .close ();
85
81
}
86
82
87
83
protected abstract HttpClient .Factory getHttpClientFactory ();
@@ -95,20 +91,21 @@ private void withHttp1() {
95
91
@ DisplayName ("Should be able to make 2048 simultaneous HTTP/1.x connections before processing the response" )
96
92
@ DisabledOnOs (OS .WINDOWS )
97
93
public void http1Connections () throws Exception {
98
- final DelayedResponseHandler handler = new DelayedResponseHandler (MAX_HTTP_1_CONNECTIONS ,
99
- exchange -> {
100
- exchange .sendResponseHeaders (204 , -1 );
101
- exchange .close ();
102
- });
103
- httpServer .createContext ("/http" , handler );
104
- try (final HttpClient client = clientBuilder .build ()) {
105
- final Collection <CompletableFuture <HttpResponse <AsyncBody >>> asyncResponses = ConcurrentHashMap .newKeySet ();
106
- final HttpRequest request = client .newHttpRequestBuilder ()
107
- .uri (String .format ("http://localhost:%s/http" , httpServer .getAddress ().getPort ()))
108
- .build ();
94
+ final Collection <CompletableFuture <HttpResponse <AsyncBody >>> asyncResponses = ConcurrentHashMap .newKeySet ();
95
+ try (
96
+ var server = new DelayedResponseHttp1Server (vertx , MAX_HTTP_1_CONNECTIONS );
97
+ var client = clientBuilder .tag (new RequestConfigBuilder ().withRequestRetryBackoffLimit (0 ).build ()).build ()) {
109
98
for (int it = 0 ; it < MAX_HTTP_1_CONNECTIONS ; it ++) {
99
+ final HttpRequest request = client .newHttpRequestBuilder ()
100
+ .uri (server .uri () + "?" + it )
101
+ .build ();
110
102
asyncResponses .add (client .consumeBytes (request , (value , asyncBody ) -> asyncBody .consume ()));
111
- handler .await ();
103
+ }
104
+ server .await ();
105
+ assertThat (server .requests )
106
+ .hasSize (MAX_HTTP_1_CONNECTIONS );
107
+ for (HttpServerRequest serverRequest : server .requests ) {
108
+ serverRequest .response ().setStatusCode (204 ).end ();
112
109
}
113
110
CompletableFuture .allOf (asyncResponses .toArray (new CompletableFuture [0 ])).get (70 , TimeUnit .SECONDS );
114
111
assertThat (asyncResponses )
@@ -126,19 +123,18 @@ public void http1Connections() throws Exception {
126
123
@ DisplayName ("Should be able to make 1024 simultaneous HTTP connections before upgrading to WebSocket" )
127
124
@ DisabledOnOs (OS .WINDOWS )
128
125
public void http1WebSocketConnectionsBeforeUpgrade () throws Exception {
129
- final DelayedResponseHandler handler = new DelayedResponseHandler (MAX_HTTP_1_WS_CONNECTIONS ,
130
- exchange -> exchange .sendResponseHeaders (404 , -1 ));
131
- httpServer .createContext ("/http" , handler );
132
- try (final HttpClient client = clientBuilder .build ()) {
126
+ try (var server = new DelayedResponseHttp1Server (vertx , MAX_HTTP_1_WS_CONNECTIONS ); var client = clientBuilder .build ()) {
133
127
for (int it = 0 ; it < MAX_HTTP_1_WS_CONNECTIONS ; it ++) {
134
128
client .newWebSocketBuilder ()
135
- .uri (URI .create (String . format ( "http://localhost:%s/http" , httpServer . getAddress (). getPort () )))
129
+ .uri (URI .create (server . uri ( )))
136
130
.buildAsync (new WebSocket .Listener () {
137
131
});
138
- handler .await ();
139
132
}
133
+ server .await ();
134
+ assertThat (server .requests )
135
+ .hasSize (MAX_HTTP_1_WS_CONNECTIONS );
136
+ server .requests .forEach (request -> request .response ().setStatusCode (101 ).end ());
140
137
}
141
- assertThat (handler .connectionCount .get (60 , TimeUnit .SECONDS )).isEqualTo (MAX_HTTP_1_WS_CONNECTIONS );
142
138
}
143
139
144
140
@ Test
@@ -192,47 +188,46 @@ public void onMessage(WebSocket webSocket, String text) {
192
188
}
193
189
}
194
190
195
- private static class DelayedResponseHandler implements HttpHandler {
196
-
197
- private final int requestCount ;
198
- private final CyclicBarrier barrier ;
199
- private final Set <HttpExchange > exchanges ;
200
- private final CompletableFuture <Integer > connectionCount ;
201
- private final ExecutorService executorService ;
202
-
203
- private DelayedResponseHandler (int requestCount , HttpHandler handler ) {
204
- this .requestCount = requestCount ;
205
- this .barrier = new CyclicBarrier (2 );
206
- exchanges = ConcurrentHashMap .newKeySet ();
207
- connectionCount = new CompletableFuture <>();
208
- executorService = Executors .newFixedThreadPool (1 );
209
- connectionCount .thenRunAsync (() -> {
210
- for (HttpExchange exchange : exchanges ) {
211
- try {
212
- handler .handle (exchange );
213
- } catch (IOException ignore ) {
214
- // NO OP
215
- }
216
- }
217
- }, executorService )
218
- .whenComplete ((unused , throwable ) -> executorService .shutdownNow ());
191
+ private static class DelayedResponseHttp1Server implements AutoCloseable {
192
+
193
+ private final int connections ;
194
+ private final HttpServer httpServer ;
195
+ private final Collection <HttpServerRequest > requests ;
196
+ private final CountDownLatch connectionLatch ;
197
+
198
+ private DelayedResponseHttp1Server (Vertx vertx , int connections ) throws Exception {
199
+ this .connections = connections ;
200
+ requests = ConcurrentHashMap .newKeySet ();
201
+ connectionLatch = new CountDownLatch (connections );
202
+ httpServer = vertx .createHttpServer (new HttpServerOptions ()
203
+ .setPort (NetServerOptions .DEFAULT_PORT )
204
+ .setAlpnVersions (Collections .singletonList (HttpVersion .HTTP_1_1 )));
205
+ httpServer .connectionHandler (event -> connectionLatch .countDown ());
206
+ httpServer .requestHandler (requests ::add );
207
+ httpServer .listen ().toCompletionStage ().toCompletableFuture ().get (10 , TimeUnit .SECONDS );
219
208
}
220
209
221
210
@ Override
222
- public void handle (HttpExchange exchange ) {
223
- exchanges .add (exchange );
224
- await ();
225
- if (exchanges .size () == requestCount ) {
226
- connectionCount .complete (requestCount );
227
- }
211
+ public void close () throws Exception {
212
+ requests .forEach (request -> request .connection ().close ());
213
+ requests .clear ();
214
+ httpServer .close ().toCompletionStage ().toCompletableFuture ().get (10 , TimeUnit .SECONDS );
215
+ }
228
216
217
+ private String uri () {
218
+ return String .format ("http://localhost:%s/http-1-connections" , httpServer .actualPort ());
229
219
}
230
220
231
- public final void await () {
221
+ private void await () {
232
222
try {
233
- barrier .await (5 , TimeUnit .SECONDS );
234
- } catch (Exception ex ) {
235
- throw new RuntimeException ("Failed to await the barrier" );
223
+ if (!connectionLatch .await (10 , TimeUnit .SECONDS )) {
224
+ throw new AssertionError (
225
+ "Failed to await the connection latch, remaining connections to open: " + connectionLatch .getCount ());
226
+ }
227
+ Awaitility .await ().atMost (5 , TimeUnit .SECONDS ).until (() -> requests .size () == connections );
228
+ } catch (InterruptedException e ) {
229
+ Thread .currentThread ().interrupt ();
230
+ throw new RuntimeException ("Failed to await the connection latch (interrupted)" , e );
236
231
}
237
232
}
238
233
}
0 commit comments