|
169 | 169 | -define(MIN_CHECKPOINT_INTERVAL, 64).
|
170 | 170 | -define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000).
|
171 | 171 | -define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000).
|
| 172 | +-define(RA_MEMBERS_TIMEOUT, 30_000). |
172 | 173 |
|
173 | 174 | %%----------- QQ policies ---------------------------------------------------
|
174 | 175 |
|
@@ -1229,7 +1230,6 @@ policy_changed(Q) ->
|
1229 | 1230 | end.
|
1230 | 1231 |
|
1231 | 1232 | -spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'.
|
1232 |
| - |
1233 | 1233 | cluster_state(Name) ->
|
1234 | 1234 | case whereis(Name) of
|
1235 | 1235 | undefined -> down;
|
@@ -1577,17 +1577,18 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
|
1577 | 1577 | is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
|
1578 | 1578 | is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
|
1579 | 1579 |
|
1580 |
| --spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}. |
| 1580 | +-spec transfer_leadership(amqqueue:amqqueue(), node()) -> |
| 1581 | + {migrated, node()} | {not_migrated, atom()}. |
1581 | 1582 | transfer_leadership(Q, Destination) ->
|
1582 |
| - {RaName, _} = Pid = amqqueue:get_pid(Q), |
1583 |
| - case ra:transfer_leadership(Pid, {RaName, Destination}) of |
| 1583 | + {RaName, _} = Leader = amqqueue:get_pid(Q), |
| 1584 | + case ra:transfer_leadership(Leader, {RaName, Destination}) of |
1584 | 1585 | ok ->
|
1585 |
| - case ra:members(Pid) of |
1586 |
| - {_, _, {_, NewNode}} -> |
1587 |
| - {migrated, NewNode}; |
1588 |
| - {timeout, _} -> |
1589 |
| - {not_migrated, ra_members_timeout} |
1590 |
| - end; |
| 1586 | + case ra:members(Leader, ?RA_MEMBERS_TIMEOUT) of |
| 1587 | + {_, _, {_, NewNode}} -> |
| 1588 | + {migrated, NewNode}; |
| 1589 | + {timeout, _} -> |
| 1590 | + {not_migrated, ra_members_timeout} |
| 1591 | + end; |
1591 | 1592 | already_leader ->
|
1592 | 1593 | {not_migrated, already_leader};
|
1593 | 1594 | {error, Reason} ->
|
@@ -1750,9 +1751,18 @@ i(memory, Q) when ?is_amqqueue(Q) ->
|
1750 | 1751 | 0
|
1751 | 1752 | end;
|
1752 | 1753 | i(state, Q) when ?is_amqqueue(Q) ->
|
1753 |
| - {Name, Node} = amqqueue:get_pid(Q), |
| 1754 | + {Name, Node} = case find_leader(Q) of |
| 1755 | + undefined -> |
| 1756 | + %% fall back to queue record |
| 1757 | + amqqueue:get_pid(Q); |
| 1758 | + Leader -> |
| 1759 | + Leader |
| 1760 | + end, |
1754 | 1761 | %% Check against the leader or last known leader
|
1755 | 1762 | case erpc_call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of
|
| 1763 | + {error, {erpc, timeout}} -> |
| 1764 | + %% ?? |
| 1765 | + timeout; |
1756 | 1766 | {error, _} ->
|
1757 | 1767 | down;
|
1758 | 1768 | State ->
|
@@ -2299,27 +2309,50 @@ drain(TransferCandidates) ->
|
2299 | 2309 | transfer_leadership([]) ->
|
2300 | 2310 | ?LOG_WARNING("Skipping leadership transfer of quorum queues: no candidate "
|
2301 | 2311 | "(online, not under maintenance) nodes to transfer to!");
|
2302 |
| -transfer_leadership(_TransferCandidates) -> |
| 2312 | +transfer_leadership(_CandidateNodes) -> |
2303 | 2313 | %% we only transfer leadership for QQs that have local leaders
|
2304 |
| - Queues = rabbit_amqqueue:list_local_leaders(), |
| 2314 | + LocalLeaderQueues = rabbit_amqqueue:list_local_leaders(), |
| 2315 | + QueuesChunked = ra_lib:lists_chunk(256, LocalLeaderQueues), |
2305 | 2316 | ?LOG_INFO("Will transfer leadership of ~b quorum queues with current leader on this node",
|
2306 |
| - [length(Queues)]), |
2307 |
| - _ = [begin |
2308 |
| - Name = amqqueue:get_name(Q), |
2309 |
| - ?LOG_DEBUG("Will trigger a leader election for local quorum queue ~ts", |
2310 |
| - [rabbit_misc:rs(Name)]), |
2311 |
| - %% we trigger an election and exclude this node from the list of candidates |
2312 |
| - %% by simply shutting its local QQ replica (Ra server) |
2313 |
| - RaLeader = amqqueue:get_pid(Q), |
2314 |
| - ?LOG_DEBUG("Will stop Ra server ~tp", [RaLeader]), |
2315 |
| - case rabbit_quorum_queue:stop_server(RaLeader) of |
2316 |
| - ok -> |
2317 |
| - ?LOG_DEBUG("Successfully stopped Ra server ~tp", [RaLeader]); |
2318 |
| - {error, nodedown} -> |
2319 |
| - ?LOG_ERROR("Failed to stop Ra server ~tp: target node was reported as down") |
2320 |
| - end |
2321 |
| - end || Q <- Queues], |
2322 |
| - ?LOG_INFO("Leadership transfer for quorum queues hosted on this node has been initiated"). |
| 2317 | + [length(LocalLeaderQueues)]), |
| 2318 | + [begin |
| 2319 | + [begin |
| 2320 | + %% we trigger an election and exclude this node from the list of candidates |
| 2321 | + %% by simply shutting its local QQ replica (Ra server) |
| 2322 | + RaLeader = amqqueue:get_pid(Q), |
| 2323 | + ?LOG_DEBUG("Will stop Ra leader ~tp", [RaLeader]), |
| 2324 | + case rabbit_quorum_queue:stop_server(RaLeader) of |
| 2325 | + ok -> |
| 2326 | + ?LOG_DEBUG("Successfully stopped Ra server ~tp", [RaLeader]); |
| 2327 | + {error, nodedown} -> |
| 2328 | + ?LOG_ERROR("Failed to stop Ra server ~tp: target node was reported as down") |
| 2329 | + end, |
| 2330 | + ok |
| 2331 | + end || Q <- Queues], |
| 2332 | + %% wait for leader elections before processing next chunk of queues |
| 2333 | + [begin |
| 2334 | + {RaName, LeaderNode} = amqqueue:get_pid(Q), |
| 2335 | + MemberNodes = lists:delete(LeaderNode, amqqueue:get_nodes(Q)), |
| 2336 | + %% we don't do any explicit error handling here as it is more |
| 2337 | + %% important to make progress |
| 2338 | + _ = lists:any(fun (N) -> |
| 2339 | + case ra:members({RaName, N}, ?RA_MEMBERS_TIMEOUT) of |
| 2340 | + {ok, _, _} -> |
| 2341 | + true; |
| 2342 | + Err -> |
| 2343 | + Name = amqqueue:get_name(Q), |
| 2344 | + ?LOG_DEBUG("Failed to wait for leader election for queue ~ts on ~tp Err ~ts", |
| 2345 | + [Name, N, Err]), |
| 2346 | + false |
| 2347 | + end |
| 2348 | + end, MemberNodes), |
| 2349 | + ok |
| 2350 | + |
| 2351 | + end || Q <- Queues], |
| 2352 | + ok |
| 2353 | + end || Queues <- QueuesChunked], |
| 2354 | + ?LOG_INFO("Leadership transfer for quorum queues hosted on this node has been initiated"), |
| 2355 | + ok. |
2323 | 2356 |
|
2324 | 2357 | %% TODO: I just copied it over, it looks like was always called inside maintenance so...
|
2325 | 2358 | -spec stop_local_quorum_queue_followers() -> ok.
|
|
0 commit comments