Skip to content

Commit 9156e60

Browse files
committed
Extend and update tests for duplicate unacked messages
The resend_* cases are as before, but there is no more exponential duplication. - On session replacement, there is no duplication at all. - On clean exit, messages are still rerouted to all online sessions, but the duplication prevention allows only one copy of message originally rerouted from a given Pid. The relay_* cases are new, and they ensure that if the user reconnects several times, the messages are not lost.
1 parent 7575c04 commit 9156e60

File tree

1 file changed

+102
-30
lines changed

1 file changed

+102
-30
lines changed

big_tests/tests/sm_SUITE.erl

Lines changed: 102 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,10 @@ parallel_cases() ->
125125
parallel_large_buffer_cases() ->
126126
[resend_unacked_from_stopped_sessions,
127127
resend_unacked_from_terminated_sessions,
128-
resend_unacked_from_replaced_sessions].
128+
resend_unacked_from_replaced_sessions,
129+
relay_unacked_from_stopped_sessions,
130+
relay_unacked_from_terminated_sessions,
131+
relay_unacked_from_replaced_sessions].
129132

130133
parallel_manual_ack_freq_1_cases() ->
131134
[client_acks_more_than_sent,
@@ -591,24 +594,30 @@ resend_more_offline_messages_than_buffer_size(Config) ->
591594

592595
%% Test cases for duplicate buffer
593596

597+
-define(USER_NUM, 4).
598+
594599
resend_unacked_from_stopped_sessions(Config) ->
595600
Texts = [<<"msg-1">>],
596-
{Bob, UserSpecs, Users} = connect_initial_users(Texts, Config),
601+
{Bob, UserSpecs, Users} = connect_initial_users(Config),
597602

598603
%% Bob sends messages to User's bare jid
599604
UserJid = escalus_users:get_jid(Config, hd(UserSpecs)),
600605
sm_helper:send_messages(Bob, UserJid, Texts),
601606
C2SPids = lists:map(fun mongoose_helper:get_session_pid/1, Users),
602607
[sm_helper:wait_for_c2s_unacked_count(Pid, length(Texts)) || Pid <- C2SPids],
603608

604-
%% Each User session checks messages and stops,
605-
%% causing buffer rerouting and message duplication
606-
Funs = [fun() -> User end || User <- Users],
607-
reconnect_and_receive_messages(Funs, Texts). % Reconnection is skipped in this test case
609+
%% Each User's session checks messages and stops, rerouting unacked messages to online sessions.
610+
%% This is why there is one extra copy of each message received in each subsequent iteration.
611+
lists:foreach(fun({N, User}) ->
612+
DuplicatedTexts = lists:append(lists:duplicate(N, Texts)),
613+
receive_unacked_messages(User, DuplicatedTexts),
614+
escalus_connection:stop(User)
615+
end, lists:enumerate(Users)),
616+
escalus_connection:stop(Bob).
608617

609618
resend_unacked_from_terminated_sessions(Config) ->
610619
Texts = [<<"msg-1">>],
611-
{Bob, UserSpecs, Users} = connect_initial_users(Texts, Config),
620+
{Bob, UserSpecs, Users} = connect_initial_users(Config),
612621

613622
%% User disconnects all sessions abruptly
614623
lists:foreach(fun escalus_connection:kill/1, Users),
@@ -621,13 +630,17 @@ resend_unacked_from_terminated_sessions(Config) ->
621630
[sm_helper:wait_for_c2s_unacked_count(Pid, length(Texts)) || Pid <- C2SPids],
622631

623632
%% User replaces each terminated session with a new one,
624-
%% causing buffer rerouting and message duplication
625-
Funs = [fun() -> connect_spec(Spec, session) end || Spec <- UserSpecs],
626-
reconnect_and_receive_messages(Funs, Texts).
633+
%% rerouting unacked messages to the new session
634+
lists:foreach(fun(UserSpec) ->
635+
NewUser = connect_spec(UserSpec, session),
636+
receive_unacked_messages(NewUser, Texts),
637+
escalus_connection:stop(NewUser)
638+
end, UserSpecs),
639+
escalus_connection:stop(Bob).
627640

628641
resend_unacked_from_replaced_sessions(Config) ->
629642
Texts = [<<"msg-1">>],
630-
{Bob, UserSpecs, Users} = connect_initial_users(Texts, Config),
643+
{Bob, UserSpecs, Users} = connect_initial_users(Config),
631644

632645
%% Bob sends messages to User's bare jid
633646
UserJid = escalus_users:get_jid(Config, hd(UserSpecs)),
@@ -636,32 +649,91 @@ resend_unacked_from_replaced_sessions(Config) ->
636649
[sm_helper:wait_for_c2s_unacked_count(Pid, length(Texts)) || Pid <- C2SPids],
637650

638651
%% User replaces each online session with a new one,
639-
%% causing buffer rerouting and message duplication
640-
Funs = [fun() -> connect_spec(Spec, session) end || Spec <- UserSpecs],
641-
reconnect_and_receive_messages(Funs, Texts).
652+
%% rerouting unacked messages to the new session
653+
lists:foreach(fun(UserSpec) ->
654+
NewUser = connect_spec(UserSpec, session),
655+
receive_unacked_messages(NewUser, Texts),
656+
escalus_connection:stop(NewUser)
657+
end, UserSpecs),
658+
escalus_connection:stop(Bob).
642659

643-
connect_initial_users(Texts, Config) ->
644-
Resources = [<<"res-", (integer_to_binary(I))/binary>> || I <- lists:seq(1, 4)],
660+
connect_initial_users(Config) ->
661+
Resources = [<<"res-", (integer_to_binary(I))/binary>> || I <- lists:seq(1, ?USER_NUM)],
645662
Bob = connect_fresh(Config, bob, session),
646663
BasicUserSpec = escalus_fresh:create_fresh_user(Config, ?config(user, Config)),
647664
UserSpecs = [[{resource, Res} | BasicUserSpec] || Res <- Resources],
648665
Users = [connect_spec(Spec, sm_after_session) || Spec <- UserSpecs],
649666
{Bob, UserSpecs, Users}.
650667

651-
%% Reconnect (optionally), receive messages and disconnect each resource cleanly, in sequence
652-
%% There is an issue causing each subsequent session
653-
%% to receive twice as many messages as the previous one, i.e. 1, 2, 4, 8, ...
654-
%% See https://github.com/esl/MongooseIM/pull/4498 for a more detailed description with a diagram
655-
reconnect_and_receive_messages([UserF | Rest], Texts) ->
656-
NewUser = UserF(),
657-
sm_helper:wait_for_messages(NewUser, Texts),
658-
timer:sleep(100), % wait a short time to ensure no extra messages arrive
659-
escalus_assert:has_no_stanzas(NewUser),
660-
escalus_connection:stop(NewUser),
661-
%% Expect duplicated buffer in the next session
662-
reconnect_and_receive_messages(Rest, Texts ++ Texts);
663-
reconnect_and_receive_messages([], _Texts) ->
664-
ok.
668+
relay_unacked_from_stopped_sessions(Config) ->
669+
Texts = [<<"msg-1">>],
670+
Resources = [<<"res-", (integer_to_binary(I))/binary>> || I <- lists:seq(1, ?USER_NUM)],
671+
Bob = connect_fresh(Config, bob, session),
672+
BasicUserSpec = escalus_fresh:create_fresh_user(Config, ?config(user, Config)),
673+
UserSpecs = [[{resource, Res} | BasicUserSpec] || Res <- Resources],
674+
FirstUser = connect_spec(hd(UserSpecs), sm_after_session),
675+
676+
%% Bob sends messages to User's bare jid
677+
UserJid = escalus_users:get_jid(Config, hd(UserSpecs)),
678+
sm_helper:send_messages(Bob, UserJid, Texts),
679+
680+
%% Each User's session resends unacked messages to the next one
681+
RelayF = fun(NextSpec, CurUser) ->
682+
receive_unacked_messages(CurUser, Texts),
683+
NewUser = connect_spec(NextSpec, sm_before_session),
684+
escalus_connection:stop(CurUser),
685+
NewUser
686+
end,
687+
LastUser = lists:foldl(RelayF, FirstUser, tl(UserSpecs)),
688+
escalus_connection:stop(LastUser),
689+
escalus_connection:stop(Bob).
690+
691+
relay_unacked_from_terminated_sessions(Config) ->
692+
Texts = [<<"msg-1">>],
693+
Bob = connect_fresh(Config, bob, session),
694+
UserSpec = escalus_fresh:create_fresh_user(Config, ?config(user, Config)),
695+
FirstUser = connect_spec(UserSpec, sm_after_session),
696+
697+
%% Bob sends messages to User's bare jid
698+
UserJid = escalus_users:get_jid(Config, UserSpec),
699+
sm_helper:send_messages(Bob, UserJid, Texts),
700+
701+
%% Each User's session resends unacked messages to the next one
702+
RelayF = fun(NextSpec, CurUser) ->
703+
receive_unacked_messages(CurUser, Texts),
704+
escalus_connection:kill(CurUser),
705+
C2SPid = mongoose_helper:get_session_pid(CurUser),
706+
sm_helper:wait_until_resume_session(C2SPid),
707+
connect_spec(NextSpec, sm_before_session)
708+
end,
709+
LastUser = lists:foldl(RelayF, FirstUser, lists:duplicate(?USER_NUM - 1, UserSpec)),
710+
escalus_connection:stop(LastUser),
711+
escalus_connection:stop(Bob).
712+
713+
relay_unacked_from_replaced_sessions(Config) ->
714+
Texts = [<<"msg-1">>],
715+
Bob = connect_fresh(Config, bob, session),
716+
UserSpec = escalus_fresh:create_fresh_user(Config, ?config(user, Config)),
717+
FirstUser = connect_spec(UserSpec, sm_after_session),
718+
719+
%% Bob sends messages to User's bare jid
720+
UserJid = escalus_users:get_jid(Config, UserSpec),
721+
sm_helper:send_messages(Bob, UserJid, Texts),
722+
723+
%% Each User's session resends unacked messages to the next one
724+
RelayF = fun(NextSpec, CurUser) ->
725+
receive_unacked_messages(CurUser, Texts),
726+
connect_spec(NextSpec, sm_before_session)
727+
end,
728+
LastUser = lists:foldl(RelayF, FirstUser, lists:duplicate(?USER_NUM - 1, UserSpec)),
729+
escalus_connection:stop(LastUser),
730+
escalus_connection:stop(Bob).
731+
732+
%% Receive expected messages and wait a bit to ensure no extra messages arrive
733+
receive_unacked_messages(User, Texts) ->
734+
sm_helper:wait_for_messages(User, Texts),
735+
timer:sleep(100),
736+
escalus_assert:has_no_stanzas(User).
665737

666738
resend_unacked_on_reconnection(Config) ->
667739
Texts = three_texts(),

0 commit comments

Comments
 (0)