Skip to content

Commit a586668

Browse files
authored
Merge pull request #4496 from esl/debug-stream-management
Deduplicate stream management buffer (solution 2+3)
2 parents 2966576 + 9156e60 commit a586668

File tree

6 files changed

+155
-57
lines changed

6 files changed

+155
-57
lines changed

big_tests/tests/sm_SUITE.erl

Lines changed: 102 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,10 @@ parallel_cases() ->
125125
parallel_large_buffer_cases() ->
126126
[resend_unacked_from_stopped_sessions,
127127
resend_unacked_from_terminated_sessions,
128-
resend_unacked_from_replaced_sessions].
128+
resend_unacked_from_replaced_sessions,
129+
relay_unacked_from_stopped_sessions,
130+
relay_unacked_from_terminated_sessions,
131+
relay_unacked_from_replaced_sessions].
129132

130133
parallel_manual_ack_freq_1_cases() ->
131134
[client_acks_more_than_sent,
@@ -591,24 +594,30 @@ resend_more_offline_messages_than_buffer_size(Config) ->
591594

592595
%% Test cases for duplicate buffer
593596

597+
-define(USER_NUM, 4).
598+
594599
resend_unacked_from_stopped_sessions(Config) ->
595600
Texts = [<<"msg-1">>],
596-
{Bob, UserSpecs, Users} = connect_initial_users(Texts, Config),
601+
{Bob, UserSpecs, Users} = connect_initial_users(Config),
597602

598603
%% Bob sends messages to User's bare jid
599604
UserJid = escalus_users:get_jid(Config, hd(UserSpecs)),
600605
sm_helper:send_messages(Bob, UserJid, Texts),
601606
C2SPids = lists:map(fun mongoose_helper:get_session_pid/1, Users),
602607
[sm_helper:wait_for_c2s_unacked_count(Pid, length(Texts)) || Pid <- C2SPids],
603608

604-
%% Each User session checks messages and stops,
605-
%% causing buffer rerouting and message duplication
606-
Funs = [fun() -> User end || User <- Users],
607-
reconnect_and_receive_messages(Funs, Texts). % Reconnection is skipped in this test case
609+
%% Each User's session checks messages and stops, rerouting unacked messages to online sessions.
610+
%% This is why there is one extra copy of each message received in each subsequent iteration.
611+
lists:foreach(fun({N, User}) ->
612+
DuplicatedTexts = lists:append(lists:duplicate(N, Texts)),
613+
receive_unacked_messages(User, DuplicatedTexts),
614+
escalus_connection:stop(User)
615+
end, lists:enumerate(Users)),
616+
escalus_connection:stop(Bob).
608617

609618
resend_unacked_from_terminated_sessions(Config) ->
610619
Texts = [<<"msg-1">>],
611-
{Bob, UserSpecs, Users} = connect_initial_users(Texts, Config),
620+
{Bob, UserSpecs, Users} = connect_initial_users(Config),
612621

613622
%% User disconnects all sessions abruptly
614623
lists:foreach(fun escalus_connection:kill/1, Users),
@@ -621,13 +630,17 @@ resend_unacked_from_terminated_sessions(Config) ->
621630
[sm_helper:wait_for_c2s_unacked_count(Pid, length(Texts)) || Pid <- C2SPids],
622631

623632
%% User replaces each terminated session with a new one,
624-
%% causing buffer rerouting and message duplication
625-
Funs = [fun() -> connect_spec(Spec, session) end || Spec <- UserSpecs],
626-
reconnect_and_receive_messages(Funs, Texts).
633+
%% rerouting unacked messages to the new session
634+
lists:foreach(fun(UserSpec) ->
635+
NewUser = connect_spec(UserSpec, session),
636+
receive_unacked_messages(NewUser, Texts),
637+
escalus_connection:stop(NewUser)
638+
end, UserSpecs),
639+
escalus_connection:stop(Bob).
627640

628641
resend_unacked_from_replaced_sessions(Config) ->
629642
Texts = [<<"msg-1">>],
630-
{Bob, UserSpecs, Users} = connect_initial_users(Texts, Config),
643+
{Bob, UserSpecs, Users} = connect_initial_users(Config),
631644

632645
%% Bob sends messages to User's bare jid
633646
UserJid = escalus_users:get_jid(Config, hd(UserSpecs)),
@@ -636,32 +649,91 @@ resend_unacked_from_replaced_sessions(Config) ->
636649
[sm_helper:wait_for_c2s_unacked_count(Pid, length(Texts)) || Pid <- C2SPids],
637650

638651
%% User replaces each online session with a new one,
639-
%% causing buffer rerouting and message duplication
640-
Funs = [fun() -> connect_spec(Spec, session) end || Spec <- UserSpecs],
641-
reconnect_and_receive_messages(Funs, Texts).
652+
%% rerouting unacked messages to the new session
653+
lists:foreach(fun(UserSpec) ->
654+
NewUser = connect_spec(UserSpec, session),
655+
receive_unacked_messages(NewUser, Texts),
656+
escalus_connection:stop(NewUser)
657+
end, UserSpecs),
658+
escalus_connection:stop(Bob).
642659

643-
connect_initial_users(Texts, Config) ->
644-
Resources = [<<"res-", (integer_to_binary(I))/binary>> || I <- lists:seq(1, 4)],
660+
connect_initial_users(Config) ->
661+
Resources = [<<"res-", (integer_to_binary(I))/binary>> || I <- lists:seq(1, ?USER_NUM)],
645662
Bob = connect_fresh(Config, bob, session),
646663
BasicUserSpec = escalus_fresh:create_fresh_user(Config, ?config(user, Config)),
647664
UserSpecs = [[{resource, Res} | BasicUserSpec] || Res <- Resources],
648665
Users = [connect_spec(Spec, sm_after_session) || Spec <- UserSpecs],
649666
{Bob, UserSpecs, Users}.
650667

651-
%% Reconnect (optionally), receive messages and disconnect each resource cleanly, in sequence
652-
%% There is an issue causing each subsequent session
653-
%% to receive twice as many messages as the previous one, i.e. 1, 2, 4, 8, ...
654-
%% See https://github.com/esl/MongooseIM/pull/4498 for a more detailed description with a diagram
655-
reconnect_and_receive_messages([UserF | Rest], Texts) ->
656-
NewUser = UserF(),
657-
sm_helper:wait_for_messages(NewUser, Texts),
658-
timer:sleep(100), % wait a short time to ensure no extra messages arrive
659-
escalus_assert:has_no_stanzas(NewUser),
660-
escalus_connection:stop(NewUser),
661-
%% Expect duplicated buffer in the next session
662-
reconnect_and_receive_messages(Rest, Texts ++ Texts);
663-
reconnect_and_receive_messages([], _Texts) ->
664-
ok.
668+
relay_unacked_from_stopped_sessions(Config) ->
669+
Texts = [<<"msg-1">>],
670+
Resources = [<<"res-", (integer_to_binary(I))/binary>> || I <- lists:seq(1, ?USER_NUM)],
671+
Bob = connect_fresh(Config, bob, session),
672+
BasicUserSpec = escalus_fresh:create_fresh_user(Config, ?config(user, Config)),
673+
UserSpecs = [[{resource, Res} | BasicUserSpec] || Res <- Resources],
674+
FirstUser = connect_spec(hd(UserSpecs), sm_after_session),
675+
676+
%% Bob sends messages to User's bare jid
677+
UserJid = escalus_users:get_jid(Config, hd(UserSpecs)),
678+
sm_helper:send_messages(Bob, UserJid, Texts),
679+
680+
%% Each User's session resends unacked messages to the next one
681+
RelayF = fun(NextSpec, CurUser) ->
682+
receive_unacked_messages(CurUser, Texts),
683+
NewUser = connect_spec(NextSpec, sm_before_session),
684+
escalus_connection:stop(CurUser),
685+
NewUser
686+
end,
687+
LastUser = lists:foldl(RelayF, FirstUser, tl(UserSpecs)),
688+
escalus_connection:stop(LastUser),
689+
escalus_connection:stop(Bob).
690+
691+
relay_unacked_from_terminated_sessions(Config) ->
692+
Texts = [<<"msg-1">>],
693+
Bob = connect_fresh(Config, bob, session),
694+
UserSpec = escalus_fresh:create_fresh_user(Config, ?config(user, Config)),
695+
FirstUser = connect_spec(UserSpec, sm_after_session),
696+
697+
%% Bob sends messages to User's bare jid
698+
UserJid = escalus_users:get_jid(Config, UserSpec),
699+
sm_helper:send_messages(Bob, UserJid, Texts),
700+
701+
%% Each User's session resends unacked messages to the next one
702+
RelayF = fun(NextSpec, CurUser) ->
703+
receive_unacked_messages(CurUser, Texts),
704+
escalus_connection:kill(CurUser),
705+
C2SPid = mongoose_helper:get_session_pid(CurUser),
706+
sm_helper:wait_until_resume_session(C2SPid),
707+
connect_spec(NextSpec, sm_before_session)
708+
end,
709+
LastUser = lists:foldl(RelayF, FirstUser, lists:duplicate(?USER_NUM - 1, UserSpec)),
710+
escalus_connection:stop(LastUser),
711+
escalus_connection:stop(Bob).
712+
713+
relay_unacked_from_replaced_sessions(Config) ->
714+
Texts = [<<"msg-1">>],
715+
Bob = connect_fresh(Config, bob, session),
716+
UserSpec = escalus_fresh:create_fresh_user(Config, ?config(user, Config)),
717+
FirstUser = connect_spec(UserSpec, sm_after_session),
718+
719+
%% Bob sends messages to User's bare jid
720+
UserJid = escalus_users:get_jid(Config, UserSpec),
721+
sm_helper:send_messages(Bob, UserJid, Texts),
722+
723+
%% Each User's session resends unacked messages to the next one
724+
RelayF = fun(NextSpec, CurUser) ->
725+
receive_unacked_messages(CurUser, Texts),
726+
connect_spec(NextSpec, sm_before_session)
727+
end,
728+
LastUser = lists:foldl(RelayF, FirstUser, lists:duplicate(?USER_NUM - 1, UserSpec)),
729+
escalus_connection:stop(LastUser),
730+
escalus_connection:stop(Bob).
731+
732+
%% Receive expected messages and wait a bit to ensure no extra messages arrive
733+
receive_unacked_messages(User, Texts) ->
734+
sm_helper:wait_for_messages(User, Texts),
735+
timer:sleep(100),
736+
escalus_assert:has_no_stanzas(User).
665737

666738
resend_unacked_on_reconnection(Config) ->
667739
Texts = three_texts(),

src/c2s/mongoose_c2s.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,12 @@ maybe_retry_state(StateData = #c2s_data{listener_opts = LOpts}, C2SState) ->
662662
end.
663663

664664
-spec handle_cast(data(), state(), term()) -> fsm_res().
665+
handle_cast(StateData, _C2SState, {exit, {replaced, _} = Reason}) ->
666+
ReasonText = <<"Replaced by new connection">>,
667+
StreamConflict = mongoose_xmpp_errors:stream_conflict(StateData#c2s_data.lang, ReasonText),
668+
send_element_from_server_jid(StateData, StreamConflict),
669+
send_trailer(StateData),
670+
{stop, {shutdown, Reason}};
665671
handle_cast(StateData, _C2SState, {exit, Reason}) when is_binary(Reason) ->
666672
StreamConflict = mongoose_xmpp_errors:stream_conflict(StateData#c2s_data.lang, Reason),
667673
send_element_from_server_jid(StateData, StreamConflict),
@@ -1131,7 +1137,7 @@ start_link(Params, ProcOpts) ->
11311137
stop(Pid, Reason) ->
11321138
gen_statem:cast(Pid, {stop, Reason}).
11331139

1134-
-spec exit(pid(), binary() | atom()) -> ok.
1140+
-spec exit(pid(), binary() | atom() | {replaced, pid()}) -> ok.
11351141
exit(Pid, Reason) ->
11361142
gen_statem:cast(Pid, {exit, Reason}).
11371143

src/ejabberd_sm.erl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@
112112
-type info() :: #{info_key() => any()}.
113113

114114
-type backend() :: ejabberd_sm_mnesia | ejabberd_sm_redis | ejabberd_sm_cets.
115-
-type close_reason() :: resumed | normal | replaced.
115+
-type close_reason() :: resumed | normal | {replaced, pid()}.
116116
-type info_key() :: atom().
117117

118118
-export_type([session/0,
@@ -193,7 +193,7 @@ make_new_sid() ->
193193
ReplacedPids :: [pid()].
194194
open_session(HostType, SID, JID, Priority, Info) ->
195195
set_session(SID, JID, Priority, Info),
196-
ReplacedPIDs = check_for_sessions_to_replace(HostType, JID),
196+
ReplacedPIDs = check_for_sessions_to_replace(HostType, SID, JID),
197197
mongoose_instrument:execute(sm_session, #{host_type => HostType},
198198
#{jid => JID, logins => 1, count => 1}),
199199
mongoose_hooks:sm_register_connection(HostType, SID, JID, Info),
@@ -871,18 +871,19 @@ is_offline(#jid{luser = LUser, lserver = LServer}) ->
871871
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
872872

873873
%% @doc On new session, check if some existing connections need to be replace
874-
-spec check_for_sessions_to_replace(HostType, JID) -> ReplacedPids when
874+
-spec check_for_sessions_to_replace(HostType, SID, JID) -> ReplacedPids when
875875
HostType :: mongooseim:host_type(),
876+
SID :: sid(),
876877
JID :: jid:jid(),
877878
ReplacedPids :: [pid()].
878-
check_for_sessions_to_replace(HostType, JID) ->
879+
check_for_sessions_to_replace(HostType, {_, NewPid}, JID) ->
879880
#jid{luser = LUser, lserver = LServer, lresource = LResource} = JID,
880881
Sessions = ejabberd_sm_backend:get_sessions(LUser, LServer),
881882
%% TODO: Depending on how this is executed, there could be an unneeded
882883
%% replacement for max_sessions. We need to check this at some point.
883884
ReplacedRedundantSessions = check_existing_resources(LResource, Sessions),
884885
AllReplacedSessionPids = check_max_sessions(HostType, LUser, LServer, ReplacedRedundantSessions, Sessions),
885-
[mongoose_c2s:exit(Pid, <<"Replaced by new connection">>) || Pid <- AllReplacedSessionPids],
886+
[mongoose_c2s:exit(Pid, {replaced, NewPid}) || Pid <- AllReplacedSessionPids],
886887
AllReplacedSessionPids.
887888

888889
-spec check_existing_resources(LResource, Sessions) ->

src/mod_presence.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ close_session_status(normal) ->
560560
<<>>;
561561
close_session_status({shutdown, retries}) ->
562562
<<"Too many attempts">>;
563-
close_session_status({shutdown, replaced}) ->
563+
close_session_status({shutdown, {replaced, _Pid}}) ->
564564
<<"Replaced by new connection">>;
565565
close_session_status({shutdown, Reason}) when is_atom(Reason) ->
566566
<<"Shutdown by reason: ", (atom_to_binary(Reason))/binary>>;

src/stream_management/mod_stream_management.erl

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
-include("mongoose_config_spec.hrl").
4444
-define(STREAM_MGMT_H_MAX, (1 bsl 32 - 1)).
4545
-define(CONSTRAINT_CHECK_TIMEOUT, 5000). %% 5 seconds
46-
-define(IS_STREAM_MGMT_STOP(R), R =:= {shutdown, ?MODULE}; R =:= {shutdown, resumed}).
4746
-define(IS_ALLOWED_STATE(S), S =:= wait_for_session_establishment; S =:= session_established).
4847

4948
-record(sm_state, {
@@ -229,8 +228,27 @@ xmpp_presend_element(Acc, #{c2s_data := StateData, c2s_state := C2SState}, _Extr
229228
{#sm_state{buffer_max = no_buffer} = SmState, _} ->
230229
maybe_send_ack_request(Acc, C2SState, SmState);
231230
{SmState, _} ->
232-
Jid = mongoose_c2s:get_jid(StateData),
233-
handle_buffer_and_ack(Acc, C2SState, Jid, SmState)
231+
case prevent_buffer_duplication(Acc, SmState) of
232+
duplicate ->
233+
{stop, Acc};
234+
Acc1 ->
235+
Jid = mongoose_c2s:get_jid(StateData),
236+
handle_buffer_and_ack(Acc1, C2SState, Jid, SmState)
237+
end
238+
end.
239+
240+
-spec prevent_buffer_duplication(mongoose_acc:t(), sm_state()) -> duplicate | mongoose_acc:t().
241+
prevent_buffer_duplication(Acc, #sm_state{buffer = Buffer}) ->
242+
case mongoose_acc:get(?MODULE, buffer_ref, undefined, Acc) of
243+
undefined ->
244+
mongoose_acc:set_permanent(?MODULE, buffer_ref, make_ref(), Acc);
245+
Ref ->
246+
case lists:any(fun(BufAcc) ->
247+
mongoose_acc:get(?MODULE, buffer_ref, undefined, BufAcc) =:= Ref
248+
end, Buffer) of
249+
true -> duplicate;
250+
false -> Acc
251+
end
234252
end.
235253

236254
-spec handle_buffer_and_ack(mongoose_acc:t(), c2s_state(), jid:jid(), sm_state()) ->
@@ -362,44 +380,45 @@ notify_unacknowledged_msg(Acc, Jid) ->
362380

363381
-spec reroute_unacked_messages(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
364382
mongoose_c2s_hooks:result().
365-
reroute_unacked_messages(Acc, #{c2s_data := StateData, reason := Reason}, #{host_type := HostType}) ->
383+
reroute_unacked_messages(Acc, #{c2s_state := C2SState, c2s_data := StateData, reason := Reason}, #{host_type := HostType}) ->
366384
MaybeSmState = get_mod_state(StateData),
367-
maybe_handle_stream_mgmt_reroute(Acc, StateData, HostType, Reason, MaybeSmState).
385+
maybe_handle_stream_mgmt_reroute(Acc, C2SState, StateData, HostType, Reason, MaybeSmState).
368386

369387
-spec user_terminate(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
370388
mongoose_c2s_hooks:result().
371-
user_terminate(Acc, #{reason := Reason}, _Extra) when ?IS_STREAM_MGMT_STOP(Reason) ->
389+
user_terminate(Acc, #{c2s_state := ?EXT_C2S_STATE(resume_session)}, _Extra) ->
372390
{stop, Acc}; %% We stop here because this termination was triggered internally
373391
user_terminate(Acc, _Params, _Extra) ->
374392
{ok, Acc}.
375393

376394
-spec maybe_handle_stream_mgmt_reroute(
377-
mongoose_acc:t(), mongoose_c2s:data(), mongooseim:host_type(), term(), maybe_sm_state()) ->
395+
mongoose_acc:t(), mongoose_c2s:state(), mongoose_c2s:data(), mongooseim:host_type(), term(), maybe_sm_state()) ->
378396
mongoose_c2s_hooks:result().
379-
maybe_handle_stream_mgmt_reroute(Acc, StateData, HostType, Reason, #sm_state{counter_in = H} = SmState)
380-
when ?IS_STREAM_MGMT_STOP(Reason) ->
397+
maybe_handle_stream_mgmt_reroute(Acc, ?EXT_C2S_STATE(resume_session), StateData, HostType, Reason, #sm_state{counter_in = H} = SmState) ->
381398
Sid = mongoose_c2s:get_sid(StateData),
382399
do_remove_smid(HostType, Sid, H),
383-
NewSmState = handle_user_terminate(SmState, StateData, HostType),
400+
NewSmState = handle_user_terminate(SmState, StateData, HostType, Reason),
384401
{ok, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewSmState})};
385-
maybe_handle_stream_mgmt_reroute(Acc, StateData, HostType, _Reason, #sm_state{} = SmState) ->
386-
NewSmState = handle_user_terminate(SmState, StateData, HostType),
402+
maybe_handle_stream_mgmt_reroute(Acc, _C2SState, StateData, HostType, Reason, #sm_state{} = SmState) ->
403+
NewSmState = handle_user_terminate(SmState, StateData, HostType, Reason),
387404
{ok, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewSmState})};
388-
maybe_handle_stream_mgmt_reroute(Acc, _StateData, _HostType, _Reason, {error, not_found}) ->
405+
maybe_handle_stream_mgmt_reroute(Acc, _C2SState,_StateData, _HostType, _Reason, {error, not_found}) ->
389406
{ok, Acc}.
390407

391-
-spec handle_user_terminate(sm_state(), mongoose_c2s:data(), mongooseim:host_type()) -> sm_state().
392-
handle_user_terminate(#sm_state{counter_in = H} = SmState, StateData, HostType) ->
408+
-spec handle_user_terminate(sm_state(), mongoose_c2s:data(), mongooseim:host_type(), term()) -> sm_state().
409+
handle_user_terminate(#sm_state{counter_in = H} = SmState, StateData, HostType, Reason) ->
393410
Sid = mongoose_c2s:get_sid(StateData),
394411
do_remove_smid(HostType, Sid, H),
395412
FromServer = mongoose_c2s:get_lserver(StateData),
396413
NewState = add_delay_elements_to_buffer(SmState, FromServer),
397-
reroute_buffer(StateData, NewState),
414+
reroute_buffer(StateData, NewState, Reason),
398415
SmState#sm_state{buffer = [], buffer_size = 0}.
399416

400-
reroute_buffer(StateData, #sm_state{buffer = Buffer, peer = {gen_statem, {Pid, _}}}) ->
417+
reroute_buffer(StateData, #sm_state{buffer = Buffer, peer = {gen_statem, {Pid, _}}}, _Reason) ->
418+
mongoose_c2s:reroute_buffer_to_pid(StateData, Pid, Buffer);
419+
reroute_buffer(StateData, #sm_state{buffer = Buffer}, {shutdown, {replaced, Pid}}) ->
401420
mongoose_c2s:reroute_buffer_to_pid(StateData, Pid, Buffer);
402-
reroute_buffer(StateData, #sm_state{buffer = Buffer}) ->
421+
reroute_buffer(StateData, #sm_state{buffer = Buffer}, _Reason) ->
403422
mongoose_c2s:reroute_buffer(StateData, Buffer).
404423

405424
add_delay_elements_to_buffer(#sm_state{buffer = Buffer} = SmState, FromServer) ->
@@ -410,7 +429,7 @@ add_delay_elements_to_buffer(#sm_state{buffer = Buffer} = SmState, FromServer) -
410429
terminate(Reason, C2SState, StateData) ->
411430
?LOG_DEBUG(#{what => stream_mgmt_statem_terminate, reason => Reason,
412431
c2s_state => C2SState, c2s_data => StateData}),
413-
mongoose_c2s:terminate({shutdown, ?MODULE}, C2SState, StateData).
432+
mongoose_c2s:terminate(Reason, C2SState, StateData).
414433

415434
-spec handle_stream_mgmt(mongoose_acc:t(), mongoose_c2s_hooks:params(), exml:element()) ->
416435
mongoose_c2s_hooks:result().

test/ejabberd_sm_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ too_many_sessions(_C) ->
366366
[given_session_opened(Sid, USR) || {Sid, USR} <- UserSessions],
367367

368368
receive
369-
{forwarded, _Sid, {'$gen_cast', {exit, <<"Replaced by new connection">>}}} ->
369+
{forwarded, _Sid, {'$gen_cast', {exit, {replaced, _Pid}}}} ->
370370
ok;
371371
Message ->
372372
ct:fail("Unexpected message: ~p", [Message])

0 commit comments

Comments
 (0)