10
10
-include_lib (" common_test/include/ct.hrl" ).
11
11
-include_lib (" amqp_client/include/amqp_client.hrl" ).
12
12
-include_lib (" eunit/include/eunit.hrl" ).
13
+ -include_lib (" rabbitmq_ct_helpers/include/rabbit_assert.hrl" ).
13
14
14
15
-compile (nowarn_export_all ).
15
16
-compile (export_all ).
@@ -41,7 +42,8 @@ groups() ->
41
42
[{cluster_size_3 , [], [
42
43
force_standalone_boot ,
43
44
force_standalone_boot_and_restart ,
44
- force_standalone_boot_and_restart_with_quorum_queues
45
+ force_standalone_boot_and_restart_with_quorum_queues ,
46
+ recover_after_partition_with_leader
45
47
]}
46
48
]},
47
49
{clustered_5_nodes , [],
@@ -66,7 +68,9 @@ suite() ->
66
68
67
69
init_per_suite (Config ) ->
68
70
rabbit_ct_helpers :log_environment (),
69
- rabbit_ct_helpers :run_setup_steps (Config ).
71
+ rabbit_ct_helpers :run_setup_steps (
72
+ Config ,
73
+ [fun rabbit_ct_broker_helpers :configure_dist_proxy /1 ]).
70
74
71
75
end_per_suite (Config ) ->
72
76
rabbit_ct_helpers :run_teardown_steps (Config ).
@@ -249,6 +253,172 @@ force_standalone_boot_and_restart_with_quorum_queues(Config) ->
249
253
250
254
ok .
251
255
256
+ recover_after_partition_with_leader (Config ) ->
257
+ Nodes = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
258
+
259
+ % % We use intermediate Erlang nodes between the common_test control node
260
+ % % and the RabbitMQ nodes, using `peer' standard_io communication. The goal
261
+ % % is to make sure the common_test control node doesn't interfere with the
262
+ % % nodes the RabbitMQ nodes can see, despite the blocking of the Erlang
263
+ % % distribution connection.
264
+ Proxies0 = [begin
265
+ {ok , Proxy , PeerNode } = peer :start_link (
266
+ #{name => peer :random_name (),
267
+ connection => standard_io ,
268
+ wait_boot => 120000 }),
269
+ ct :pal (" Proxy ~0p -> ~0p " , [Proxy , PeerNode ]),
270
+ Proxy
271
+ end || _ <- Nodes ],
272
+ Proxies = maps :from_list (lists :zip (Nodes , Proxies0 )),
273
+ ct :pal (" Proxies: ~p " , [Proxies ]),
274
+ Config1 = [{proxies , Proxies } | Config ],
275
+
276
+ NodeA = hd (Nodes ),
277
+
278
+ ct :pal (" Prevent automatic reconnection on the common_test node" ),
279
+ application :set_env (kernel , dist_auto_connect , never ),
280
+ ct :pal (" Disconnect the common_test node from RabbitMQ nodes" ),
281
+ lists :foreach (fun erlang :disconnect_node /1 , Nodes ),
282
+ ct :pal (
283
+ " Ensure RabbitMQ nodes only know about the RabbitMQ nodes "
284
+ " (and their proxy)" ),
285
+ lists :foreach (
286
+ fun (Node ) ->
287
+ ? awaitMatch (
288
+ Nodes ,
289
+ get_connected_nodes (Config1 , Node ),
290
+ 30000 )
291
+ end , Nodes ),
292
+
293
+ ct :pal (" Wait for a Khepri leader to be elected" ),
294
+ ? awaitMatch ({ok , _ }, get_leader_node (Config1 , NodeA ), 30000 ),
295
+
296
+ ct :pal (" Query the Khepri leader nodename" ),
297
+ {ok , Leader } = get_leader_node (Config1 , NodeA ),
298
+ Followers = Nodes -- [Leader ],
299
+ ct :pal (" Leader: ~0p~n Followers: ~p " , [Leader , Followers ]),
300
+
301
+ lists :foreach (
302
+ fun (Follower ) ->
303
+ ct :pal (
304
+ ? LOW_IMPORTANCE ,
305
+ " Blocking traffic between ~ts and ~ts " ,
306
+ [Leader , Follower ]),
307
+ ? assertEqual (
308
+ ok ,
309
+ proxied_rpc (
310
+ Config1 , Leader , inet_tcp_proxy_dist , block , [Follower ])),
311
+ ? assertEqual (
312
+ ok ,
313
+ proxied_rpc (
314
+ Config1 , Follower , inet_tcp_proxy_dist , block , [Leader ]))
315
+ end , Followers ),
316
+
317
+ ct :pal (
318
+ " Ensure the leader node is disconnected from other RabbitMQ nodes" ),
319
+ ? awaitMatch (
320
+ [Leader ],
321
+ get_connected_nodes (Config1 , Leader ),
322
+ 30000 ),
323
+ ct :pal (
324
+ " Ensure the follower nodes are disconnected from the leader node" ),
325
+ lists :foreach (
326
+ fun (Follower ) ->
327
+ ? awaitMatch (
328
+ Followers ,
329
+ get_connected_nodes (Config1 , Follower ),
330
+ 30000 )
331
+ end , Followers ),
332
+
333
+ ct :pal (" Wait for each side of the partition to have its own leader" ),
334
+ Follower1 = hd (Followers ),
335
+ ? awaitMatch (
336
+ false ,
337
+ begin
338
+ LeaderA = get_leader_node (Config1 , Leader ),
339
+ LeaderB = get_leader_node (Config1 , Follower1 ),
340
+ ct :pal (" LeaderA: ~0p~n LeaderB: ~0p " , [LeaderA , LeaderB ]),
341
+ LeaderA =:= LeaderB
342
+ end ,
343
+ 30000 ),
344
+
345
+ ct :pal (" Waiting for 2 minutes" ),
346
+ timer :sleep (120000 ),
347
+
348
+ ct :pal (" Query Khepri status for each RabbitMQ node" ),
349
+ PerNodeStatus1 = get_per_node_khepri_status (Config1 ),
350
+ ct :pal (" Per-node Khepri status (during partition):~n~p " , [PerNodeStatus1 ]),
351
+
352
+ lists :foreach (
353
+ fun (Follower ) ->
354
+ ct :pal (
355
+ ? LOW_IMPORTANCE ,
356
+ " Unblocking traffic between ~ts and ~ts " ,
357
+ [Leader , Follower ]),
358
+ ? assertEqual (
359
+ ok ,
360
+ proxied_rpc (
361
+ Config1 , Leader , inet_tcp_proxy_dist , allow , [Follower ])),
362
+ ? assertEqual (
363
+ ok ,
364
+ proxied_rpc (
365
+ Config1 , Follower , inet_tcp_proxy_dist , allow , [Leader ]))
366
+ end , Followers ),
367
+
368
+ ct :pal (" Wait for the whole cluster to agree on the same leader" ),
369
+ ? awaitMatch (
370
+ true ,
371
+ begin
372
+ LeaderA = get_leader_node (Config1 , Leader ),
373
+ LeaderB = get_leader_node (Config1 , Follower1 ),
374
+ ct :pal (" LeaderA: ~0p~n LeaderB: ~0p " , [LeaderA , LeaderB ]),
375
+ LeaderA =:= LeaderB
376
+ end ,
377
+ 30000 ),
378
+
379
+ ct :pal (" Query Khepri status for each RabbitMQ node" ),
380
+ PerNodeStatus2 = get_per_node_khepri_status (Config1 ),
381
+ ct :pal (" Per-node Khepri status (after recovery):~n~p " , [PerNodeStatus2 ]),
382
+
383
+ ct :pal (" Restore automatic reconnection on the common_test node" ),
384
+ application :unset_env (kernel , dist_auto_connect ),
385
+ ok .
386
+
387
+ proxied_rpc (Config , Node , Module , Function , Args ) ->
388
+ Proxies = ? config (proxies , Config ),
389
+ Proxy = maps :get (Node , Proxies ),
390
+ peer :call (
391
+ Proxy , rabbit_ct_broker_helpers , rpc ,
392
+ [Config , Node , Module , Function , Args ]).
393
+
394
+ get_leader_node (Config , Node ) ->
395
+ StoreId = rabbit_khepri :get_store_id (),
396
+ Ret = proxied_rpc (
397
+ Config , Node ,
398
+ ra_leaderboard , lookup_leader , [StoreId ]),
399
+ case Ret of
400
+ {StoreId , LeaderNode } ->
401
+ {ok , LeaderNode };
402
+ undefined ->
403
+ {error , no_leader }
404
+ end .
405
+
406
+ get_connected_nodes (Config , Node ) ->
407
+ Proxies = ? config (proxies , Config ),
408
+ Proxy = maps :get (Node , Proxies ),
409
+ Peer = peer :call (Proxy , erlang , node , []),
410
+ OtherNodes = proxied_rpc (Config , Node , erlang , nodes , []),
411
+ lists :sort ([Node | OtherNodes -- [Peer ]]).
412
+
413
+ get_per_node_khepri_status (Config ) ->
414
+ Nodes = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
415
+ maps :from_list (
416
+ lists :map (
417
+ fun (Node ) ->
418
+ Status = proxied_rpc (Config , Node , rabbit_khepri , status , []),
419
+ {Node , Status }
420
+ end , Nodes )).
421
+
252
422
rolling_restart (Config ) ->
253
423
Nodes = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
254
424
0 commit comments