Skip to content

clustering_recovery_SUITE: Add recover_after_partition_with_leader testcase #14384

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 14, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 172 additions & 2 deletions deps/rabbit/test/clustering_recovery_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-compile(nowarn_export_all).
-compile(export_all).
Expand Down Expand Up @@ -41,7 +42,8 @@ groups() ->
[{cluster_size_3, [], [
force_standalone_boot,
force_standalone_boot_and_restart,
force_standalone_boot_and_restart_with_quorum_queues
force_standalone_boot_and_restart_with_quorum_queues,
recover_after_partition_with_leader
]}
]},
{clustered_5_nodes, [],
Expand All @@ -66,7 +68,9 @@ suite() ->

init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).
rabbit_ct_helpers:run_setup_steps(
Config,
[fun rabbit_ct_broker_helpers:configure_dist_proxy/1]).

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
Expand Down Expand Up @@ -249,6 +253,172 @@ force_standalone_boot_and_restart_with_quorum_queues(Config) ->

ok.

recover_after_partition_with_leader(Config) ->
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

%% We use intermediate Erlang nodes between the common_test control node
%% and the RabbitMQ nodes, using `peer' standard_io communication. The goal
%% is to make sure the common_test control node doesn't interfere with the
%% nodes the RabbitMQ nodes can see, despite the blocking of the Erlang
%% distribution connection.
Proxies0 = [begin
{ok, Proxy, PeerNode} = peer:start_link(
#{name => peer:random_name(),
connection => standard_io,
wait_boot => 120000}),
ct:pal("Proxy ~0p -> ~0p", [Proxy, PeerNode]),
Proxy
end || _ <- Nodes],
Proxies = maps:from_list(lists:zip(Nodes, Proxies0)),
ct:pal("Proxies: ~p", [Proxies]),
Config1 = [{proxies, Proxies} | Config],

NodeA = hd(Nodes),

ct:pal("Prevent automatic reconnection on the common_test node"),
application:set_env(kernel, dist_auto_connect, never),
ct:pal("Disconnect the common_test node from RabbitMQ nodes"),
lists:foreach(fun erlang:disconnect_node/1, Nodes),
ct:pal(
"Ensure RabbitMQ nodes only know about the RabbitMQ nodes "
"(and their proxy)"),
lists:foreach(
fun(Node) ->
?awaitMatch(
Nodes,
get_connected_nodes(Config1, Node),
30000)
end, Nodes),

ct:pal("Wait for a Khepri leader to be elected"),
?awaitMatch({ok, _}, get_leader_node(Config1, NodeA), 30000),

ct:pal("Query the Khepri leader nodename"),
{ok, Leader} = get_leader_node(Config1, NodeA),
Followers = Nodes -- [Leader],
ct:pal("Leader: ~0p~nFollowers: ~p", [Leader, Followers]),

lists:foreach(
fun(Follower) ->
ct:pal(
?LOW_IMPORTANCE,
"Blocking traffic between ~ts and ~ts",
[Leader, Follower]),
?assertEqual(
ok,
proxied_rpc(
Config1, Leader, inet_tcp_proxy_dist, block, [Follower])),
?assertEqual(
ok,
proxied_rpc(
Config1, Follower, inet_tcp_proxy_dist, block, [Leader]))
end, Followers),

ct:pal(
"Ensure the leader node is disconnected from other RabbitMQ nodes"),
?awaitMatch(
[Leader],
get_connected_nodes(Config1, Leader),
30000),
ct:pal(
"Ensure the follower nodes are disconnected from the leader node"),
lists:foreach(
fun(Follower) ->
?awaitMatch(
Followers,
get_connected_nodes(Config1, Follower),
30000)
end, Followers),

ct:pal("Wait for each side of the partition to have its own leader"),
Follower1 = hd(Followers),
?awaitMatch(
false,
begin
LeaderA = get_leader_node(Config1, Leader),
LeaderB = get_leader_node(Config1, Follower1),
ct:pal("LeaderA: ~0p~nLeaderB: ~0p", [LeaderA, LeaderB]),
LeaderA =:= LeaderB
end,
30000),

ct:pal("Waiting for 2 minutes"),
timer:sleep(120000),

ct:pal("Query Khepri status for each RabbitMQ node"),
PerNodeStatus1 = get_per_node_khepri_status(Config1),
ct:pal("Per-node Khepri status (during partition):~n~p", [PerNodeStatus1]),

lists:foreach(
fun(Follower) ->
ct:pal(
?LOW_IMPORTANCE,
"Unblocking traffic between ~ts and ~ts",
[Leader, Follower]),
?assertEqual(
ok,
proxied_rpc(
Config1, Leader, inet_tcp_proxy_dist, allow, [Follower])),
?assertEqual(
ok,
proxied_rpc(
Config1, Follower, inet_tcp_proxy_dist, allow, [Leader]))
end, Followers),

ct:pal("Wait for the whole cluster to agree on the same leader"),
?awaitMatch(
true,
begin
LeaderA = get_leader_node(Config1, Leader),
LeaderB = get_leader_node(Config1, Follower1),
ct:pal("LeaderA: ~0p~nLeaderB: ~0p", [LeaderA, LeaderB]),
LeaderA =:= LeaderB
end,
30000),

ct:pal("Query Khepri status for each RabbitMQ node"),
PerNodeStatus2 = get_per_node_khepri_status(Config1),
ct:pal("Per-node Khepri status (after recovery):~n~p", [PerNodeStatus2]),

ct:pal("Restore automatic reconnection on the common_test node"),
application:unset_env(kernel, dist_auto_connect),
ok.

proxied_rpc(Config, Node, Module, Function, Args) ->
Proxies = ?config(proxies, Config),
Proxy = maps:get(Node, Proxies),
peer:call(
Proxy, rabbit_ct_broker_helpers, rpc,
[Config, Node, Module, Function, Args]).

get_leader_node(Config, Node) ->
StoreId = rabbit_khepri:get_store_id(),
Ret = proxied_rpc(
Config, Node,
ra_leaderboard, lookup_leader, [StoreId]),
case Ret of
{StoreId, LeaderNode} ->
{ok, LeaderNode};
undefined ->
{error, no_leader}
end.

get_connected_nodes(Config, Node) ->
Proxies = ?config(proxies, Config),
Proxy = maps:get(Node, Proxies),
Peer = peer:call(Proxy, erlang, node, []),
OtherNodes = proxied_rpc(Config, Node, erlang, nodes, []),
lists:sort([Node | OtherNodes -- [Peer]]).

get_per_node_khepri_status(Config) ->
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
maps:from_list(
lists:map(
fun(Node) ->
Status = proxied_rpc(Config, Node, rabbit_khepri, status, []),
{Node, Status}
end, Nodes)).

rolling_restart(Config) ->
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Expand Down
Loading