16
16
17
17
package com .hivemq .extensions .cluster .discovery .azure ;
18
18
19
- import com .azure .storage .blob .BlobClient ;
20
- import com .azure .storage .blob .BlobContainerClient ;
21
19
import com .azure .storage .blob .BlobContainerClientBuilder ;
22
- import com .azure .storage .blob .models .BlobItem ;
23
20
import io .github .sgtsilvio .gradle .oci .junit .jupiter .OciImages ;
24
21
import org .jetbrains .annotations .NotNull ;
25
22
import org .junit .jupiter .api .AfterEach ;
34
31
import org .testcontainers .utility .MountableFile ;
35
32
36
33
import java .io .ByteArrayInputStream ;
37
- import java .util .List ;
38
34
import java .util .concurrent .TimeoutException ;
39
35
import java .util .stream .Collectors ;
40
36
41
37
import static java .util .concurrent .TimeUnit .SECONDS ;
38
+ import static org .assertj .core .api .Assertions .assertThat ;
42
39
import static org .awaitility .Awaitility .await ;
43
- import static org .junit .jupiter .api .Assertions .assertEquals ;
44
- import static org .junit .jupiter .api .Assertions .assertFalse ;
45
- import static org .junit .jupiter .api .Assertions .assertTrue ;
46
40
47
41
@ SuppressWarnings ("resource" )
48
42
class AzureDiscoveryExtensionIT {
@@ -53,32 +47,32 @@ class AzureDiscoveryExtensionIT {
53
47
private static final @ NotNull String BLOB_CONTAINER_NAME = "hivemq-discovery" ;
54
48
55
49
private final @ NotNull Network network = Network .newNetwork ();
56
- private final @ NotNull GenericContainer <?> azureriteContainer =
50
+ private final @ NotNull GenericContainer <?> azuriteContainer =
57
51
new GenericContainer <>(OciImages .getImageName ("azure-storage/azurite" )) //
58
52
.withExposedPorts (AZURITE_PORT ) //
59
53
.withNetwork (network ) //
60
54
.withNetworkAliases (AZURITE_NETWORK_ALIAS );
61
55
62
56
@ BeforeEach
63
57
void setUp () {
64
- azureriteContainer .start ();
58
+ azuriteContainer .start ();
65
59
}
66
60
67
61
@ AfterEach
68
62
void tearDown () {
69
- azureriteContainer .stop ();
63
+ azuriteContainer .stop ();
70
64
network .close ();
71
65
}
72
66
73
67
@ Test
74
68
void threeNodesFormCluster () throws TimeoutException {
75
- final WaitingConsumer consumer1 = new WaitingConsumer ();
76
- final WaitingConsumer consumer2 = new WaitingConsumer ();
77
- final WaitingConsumer consumer3 = new WaitingConsumer ();
69
+ final var consumer1 = new WaitingConsumer ();
70
+ final var consumer2 = new WaitingConsumer ();
71
+ final var consumer3 = new WaitingConsumer ();
78
72
79
- final HiveMQContainer node1 = createHiveMQNode ().withLogConsumer (consumer1 );
80
- final HiveMQContainer node2 = createHiveMQNode ().withLogConsumer (consumer2 );
81
- final HiveMQContainer node3 = createHiveMQNode ().withLogConsumer (consumer3 );
73
+ final var node1 = createHiveMQNode ().withLogConsumer (consumer1 );
74
+ final var node2 = createHiveMQNode ().withLogConsumer (consumer2 );
75
+ final var node3 = createHiveMQNode ().withLogConsumer (consumer3 );
82
76
83
77
try (node1 ; node2 ; node3 ) {
84
78
node1 .start ();
@@ -93,114 +87,112 @@ void threeNodesFormCluster() throws TimeoutException {
93
87
94
88
@ Test
95
89
void twoNodesInCluster_oneNodeStarted_threeNodesInCluster () throws TimeoutException {
96
- final WaitingConsumer consumer1 = new WaitingConsumer ();
97
- final WaitingConsumer consumer2 = new WaitingConsumer ();
98
- final WaitingConsumer consumer3 = new WaitingConsumer ();
99
-
100
- final HiveMQContainer node1 = createHiveMQNode ().withLogConsumer (consumer1 );
101
- final HiveMQContainer node2 = createHiveMQNode ().withLogConsumer (consumer2 );
102
- final HiveMQContainer node3 = createHiveMQNode ().withLogConsumer (consumer3 );
90
+ final var consumer1 = new WaitingConsumer ();
91
+ final var consumer2 = new WaitingConsumer ();
92
+ final var consumer3 = new WaitingConsumer ();
103
93
94
+ final var node1 = createHiveMQNode ().withLogConsumer (consumer1 );
95
+ final var node2 = createHiveMQNode ().withLogConsumer (consumer2 );
96
+ final var node3 = createHiveMQNode ().withLogConsumer (consumer3 );
104
97
try (node1 ; node2 ; node3 ) {
105
98
node1 .start ();
106
99
node2 .start ();
107
-
108
100
consumer1 .waitUntil (frame -> frame .getUtf8String ().contains ("Cluster size = 2" ), 30 , SECONDS );
109
101
consumer2 .waitUntil (frame -> frame .getUtf8String ().contains ("Cluster size = 2" ), 30 , SECONDS );
110
102
111
103
node3 .start ();
112
-
113
104
consumer3 .waitUntil (frame -> frame .getUtf8String ().contains ("Cluster size = 3" ), 300 , SECONDS );
114
105
}
115
106
}
116
107
117
108
@ Test
118
109
void twoNodesInCluster_oneNodeCannotReachAzure_nodeFileDeleted () throws TimeoutException {
119
- final ToxiproxyContainer toxiproxy = new ToxiproxyContainer (OciImages .getImageName ("shopify/toxiproxy" )) //
110
+ final var toxiproxy = new ToxiproxyContainer (OciImages .getImageName ("shopify/toxiproxy" )) //
120
111
.withNetwork (network ).withNetworkAliases (TOXIPROXY_NETWORK_ALIAS );
121
112
try (toxiproxy ) {
122
113
toxiproxy .start ();
123
114
124
- final ToxiproxyContainer .ContainerProxy proxy = toxiproxy .getProxy (azureriteContainer , AZURITE_PORT );
125
- final String toxiproxyConnectionString = createAzuriteConnectionString (TOXIPROXY_NETWORK_ALIAS , proxy .getOriginalProxyPort ());
115
+ final var proxy = toxiproxy .getProxy (azuriteContainer , AZURITE_PORT );
116
+ final var toxiproxyConnectionString =
117
+ createAzuriteConnectionString (TOXIPROXY_NETWORK_ALIAS , proxy .getOriginalProxyPort ());
126
118
127
- final WaitingConsumer toxicConsumer = new WaitingConsumer ();
128
- final WaitingConsumer normalConsumer = new WaitingConsumer ();
129
-
130
- final HiveMQContainer toxicNode =
131
- createHiveMQNode (toxiproxyConnectionString ).withLogConsumer (toxicConsumer );
132
- final HiveMQContainer normalNode = createHiveMQNode ().withLogConsumer (normalConsumer );
119
+ final var toxicConsumer = new WaitingConsumer ();
120
+ final var normalConsumer = new WaitingConsumer ();
133
121
122
+ final var toxicNode = createHiveMQNode (toxiproxyConnectionString ).withLogConsumer (toxicConsumer );
123
+ final var normalNode = createHiveMQNode ().withLogConsumer (normalConsumer );
134
124
try (toxicNode ; normalNode ) {
135
125
toxicNode .start ();
136
126
normalNode .start ();
137
-
138
127
toxicConsumer .waitUntil (frame -> frame .getUtf8String ().contains ("Cluster size = 2" ), 30 , SECONDS );
139
128
normalConsumer .waitUntil (frame -> frame .getUtf8String ().contains ("Cluster size = 2" ), 30 , SECONDS );
140
129
141
- proxy .setConnectionCut (true ); // toxicNode now cannot update its node file
130
+ // toxicNode now cannot update its node file
131
+ proxy .setConnectionCut (true );
142
132
143
- final BlobContainerClient blobContainerClient = new BlobContainerClientBuilder ().connectionString (createHostAzuriteConnectionString ())
144
- .containerName (BLOB_CONTAINER_NAME )
145
- .buildClient ();
146
-
147
- await ().pollInterval (1 , SECONDS ).atMost (60 , SECONDS ).until (() -> blobContainerClient .listBlobs ().stream ().count () == 1 );
133
+ final var blobContainerClient =
134
+ new BlobContainerClientBuilder ().connectionString (createHostAzuriteConnectionString ())
135
+ .containerName (BLOB_CONTAINER_NAME )
136
+ .buildClient ();
137
+ await ().pollInterval (1 , SECONDS )
138
+ .atMost (60 , SECONDS )
139
+ .until (() -> blobContainerClient .listBlobs ().stream ().count () == 1 );
148
140
}
149
141
}
150
142
}
151
143
152
144
@ Test
153
145
void threeNodesInCluster_oneNodeStopped_twoNodesInCluster () throws TimeoutException {
154
- final WaitingConsumer consumer1 = new WaitingConsumer ();
155
- final WaitingConsumer consumer2 = new WaitingConsumer ();
156
- final WaitingConsumer consumer3 = new WaitingConsumer ();
157
-
158
- final HiveMQContainer node1 = createHiveMQNode ().withLogConsumer (consumer1 );
159
- final HiveMQContainer node2 = createHiveMQNode ().withLogConsumer (consumer2 );
160
- final HiveMQContainer node3 = createHiveMQNode ().withLogConsumer (consumer3 );
146
+ final var consumer1 = new WaitingConsumer ();
147
+ final var consumer2 = new WaitingConsumer ();
148
+ final var consumer3 = new WaitingConsumer ();
161
149
150
+ final var node1 = createHiveMQNode ().withLogConsumer (consumer1 );
151
+ final var node2 = createHiveMQNode ().withLogConsumer (consumer2 );
152
+ final var node3 = createHiveMQNode ().withLogConsumer (consumer3 );
162
153
try (node1 ; node2 ; node3 ) {
163
154
node1 .start ();
164
155
node2 .start ();
165
156
node3 .start ();
166
-
167
157
consumer1 .waitUntil (frame -> frame .getUtf8String ().contains ("Cluster size = 3" ), 30 , SECONDS );
168
158
consumer2 .waitUntil (frame -> frame .getUtf8String ().contains ("Cluster size = 3" ), 30 , SECONDS );
169
159
consumer3 .waitUntil (frame -> frame .getUtf8String ().contains ("Cluster size = 3" ), 30 , SECONDS );
170
160
171
161
node3 .stop ();
172
-
173
162
consumer1 .waitUntil (frame -> frame .getUtf8String ().contains ("Cluster size = 2" ), 30 , SECONDS , 2 );
174
163
consumer2 .waitUntil (frame -> frame .getUtf8String ().contains ("Cluster size = 2" ), 30 , SECONDS , 2 );
175
164
176
- final BlobContainerClient blobContainerClient = new BlobContainerClientBuilder (). connectionString ( createHostAzuriteConnectionString ())
177
- . containerName ( BLOB_CONTAINER_NAME )
178
- . buildClient ();
179
-
180
- final List < BlobItem > blobs = blobContainerClient . listBlobs (). stream (). collect ( Collectors . toList ());
181
-
182
- assertEquals ( 3 , blobs .size ()); // Blob did not yet expire
165
+ final var blobContainerClient =
166
+ new BlobContainerClientBuilder (). connectionString ( createHostAzuriteConnectionString () )
167
+ . containerName ( BLOB_CONTAINER_NAME )
168
+ . buildClient ();
169
+ // blob did not yet expire
170
+ final var blobs = blobContainerClient . listBlobs (). stream (). collect ( Collectors . toList ());
171
+ assertThat ( blobs .size ()). isEqualTo ( 3 );
183
172
}
184
173
}
185
174
186
175
@ Test
176
+ @ SuppressWarnings ("HttpUrlsUsage" )
187
177
void wrongConnectionString_reloadRightConnectionString_clusterCreated () throws TimeoutException {
188
- final String wrongConnectionString = "DefaultEndpointsProtocol=http;" + //
178
+ final var wrongConnectionString = "DefaultEndpointsProtocol=http;" +
189
179
"AccountName=devstoreaccount1;" +
190
180
"AccountKey=XXX8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;" +
191
- "BlobEndpoint=http://" + AZURITE_NETWORK_ALIAS + ":" + AZURITE_PORT + "/devstoreaccount1" ;
192
-
193
- final WaitingConsumer consumer = new WaitingConsumer ();
181
+ "BlobEndpoint=http://" +
182
+ AZURITE_NETWORK_ALIAS +
183
+ ":" +
184
+ AZURITE_PORT +
185
+ "/devstoreaccount1" ;
194
186
195
- final HiveMQContainer reloadingNode = createHiveMQNode (wrongConnectionString ).withLogConsumer (consumer );
196
- final HiveMQContainer normalNode = createHiveMQNode ();
187
+ final var consumer = new WaitingConsumer ();
197
188
189
+ final var reloadingNode = createHiveMQNode (wrongConnectionString ).withLogConsumer (consumer );
190
+ final var normalNode = createHiveMQNode ();
198
191
try (reloadingNode ; normalNode ) {
199
192
reloadingNode .start ();
200
193
normalNode .start ();
201
194
202
- reloadingNode .copyFileToContainer (
203
- Transferable .of (createConfig (createDockerAzuriteConnectionString ()).getBytes ()),
195
+ reloadingNode .copyFileToContainer (Transferable .of (createConfig (createDockerAzuriteConnectionString ()).getBytes ()),
204
196
"/opt/hivemq/extensions/hivemq-azure-cluster-discovery-extension/azDiscovery.properties" );
205
197
206
198
consumer .waitUntil (frame -> frame .getUtf8String ().contains ("Cluster size = 2" ), 90 , SECONDS );
@@ -209,47 +201,51 @@ void wrongConnectionString_reloadRightConnectionString_clusterCreated() throws T
209
201
210
202
@ Test
211
203
void containerNotExisting_nodeStarted_containerCreated () {
212
- final BlobContainerClient blobContainerClient = new BlobContainerClientBuilder ().connectionString (createHostAzuriteConnectionString ())
213
- .containerName (BLOB_CONTAINER_NAME )
214
- .buildClient ();
215
-
216
- assertFalse (blobContainerClient .exists ());
217
-
218
- final HiveMQContainer node = createHiveMQNode ();
204
+ final var blobContainerClient =
205
+ new BlobContainerClientBuilder ().connectionString (createHostAzuriteConnectionString ())
206
+ .containerName (BLOB_CONTAINER_NAME )
207
+ .buildClient ();
208
+ assertThat (blobContainerClient .exists ()).isFalse ();
219
209
210
+ final var node = createHiveMQNode ();
220
211
try (node ) {
221
212
node .start ();
222
-
223
- assertTrue (blobContainerClient .exists ());
213
+ assertThat (blobContainerClient .exists ()).isTrue ();
224
214
}
225
215
}
226
216
227
217
@ Test
228
218
void containerExisting_nodeStarted_containerUsed () {
229
- final BlobContainerClient blobContainerClient = new BlobContainerClientBuilder ().connectionString (createHostAzuriteConnectionString ())
230
- .containerName (BLOB_CONTAINER_NAME )
231
- .buildClient ();
232
- final BlobClient blob = blobContainerClient .getBlobClient ("blob" );
219
+ final var blobContainerClient =
220
+ new BlobContainerClientBuilder ().connectionString (createHostAzuriteConnectionString ())
221
+ .containerName (BLOB_CONTAINER_NAME )
222
+ .buildClient ();
223
+ final var blob = blobContainerClient .getBlobClient ("blob" );
233
224
blobContainerClient .create ();
234
225
blob .upload (new ByteArrayInputStream ("Test" .getBytes ()), "Test" .getBytes ().length );
235
226
236
- final WaitingConsumer consumer = new WaitingConsumer ();
237
-
238
- final HiveMQContainer node = createHiveMQNode ().withLogConsumer (consumer );
227
+ final var consumer = new WaitingConsumer ();
239
228
229
+ final var node = createHiveMQNode ().withLogConsumer (consumer );
240
230
try (node ) {
241
231
node .start ();
242
232
243
- final List < BlobItem > blobs = blobContainerClient .listBlobs ().stream ().collect (Collectors .toList ());
244
- assertEquals ( 2 , blobs .size ());
233
+ final var blobs = blobContainerClient .listBlobs ().stream ().collect (Collectors .toList ());
234
+ assertThat ( blobs .size ()). isEqualTo ( 2 );
245
235
}
246
236
}
247
237
238
+ @ SuppressWarnings ("HttpUrlsUsage" )
248
239
private @ NotNull String createAzuriteConnectionString (final @ NotNull String host , final int port ) {
249
- return "DefaultEndpointsProtocol=http;" + //
240
+ return "DefaultEndpointsProtocol=http;" +
241
+ //
250
242
"AccountName=devstoreaccount1;" +
251
243
"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;" +
252
- "BlobEndpoint=http://" + host + ":" + port + "/devstoreaccount1" ;
244
+ "BlobEndpoint=http://" +
245
+ host +
246
+ ":" +
247
+ port +
248
+ "/devstoreaccount1" ;
253
249
}
254
250
255
251
private @ NotNull String createDockerAzuriteConnectionString () {
@@ -265,7 +261,7 @@ void containerExisting_nodeStarted_containerUsed() {
265
261
}
266
262
267
263
private @ NotNull String createHostAzuriteConnectionString () {
268
- return createAzuriteConnectionString ("127.0.0.1" , azureriteContainer .getMappedPort (AZURITE_PORT ));
264
+ return createAzuriteConnectionString ("127.0.0.1" , azuriteContainer .getMappedPort (AZURITE_PORT ));
269
265
}
270
266
271
267
private @ NotNull HiveMQContainer createHiveMQNode (final @ NotNull String connectionString ) {
0 commit comments