Skip to content

AMQP 1.0, figure 2.33: refuse link instead of terminating the entire session #14389

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 55 additions & 47 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
-record(link,
{name :: link_name(),
ref :: link_ref(),
state = detached :: detached | attach_sent | attached | detach_sent,
state = detached :: detached | attach_sent | attached | attach_refused | detach_sent,
notify :: pid(),
output_handle :: output_handle(),
input_handle :: input_handle() | undefined,
Expand Down Expand Up @@ -325,9 +325,11 @@ mapped(cast, #'v1_0.end'{} = End, State) ->
ok = notify_session_ended(End, State),
{stop, normal, State};
mapped(cast, #'v1_0.attach'{name = {utf8, Name},
initial_delivery_count = IDC,
handle = {uint, InHandle},
role = PeerRoleBool,
source = Source,
target = Target,
initial_delivery_count = IDC,
max_message_size = MaybeMaxMessageSize} = Attach,
#state{links = Links, link_index = LinkIndex,
link_handle_index = LHI} = State0) ->
Expand All @@ -339,20 +341,28 @@ mapped(cast, #'v1_0.attach'{name = {utf8, Name},
#{OutHandle := Link0} = Links,
ok = notify_link_attached(Link0, Attach, State0),

{DeliveryCount, MaxMessageSize} =
{LinkState, DeliveryCount, MaxMessageSize} =
case Link0 of
#link{role = sender = OurRole,
delivery_count = DC} ->
LS = case Target of
#'v1_0.target'{} -> attached;
_ -> attach_refused
end,
MSS = case MaybeMaxMessageSize of
{ulong, S} when S > 0 -> S;
_ -> undefined
end,
{DC, MSS};
{LS, DC, MSS};
#link{role = receiver = OurRole,
max_message_size = MSS} ->
{unpack(IDC), MSS}
LS = case Source of
#'v1_0.source'{} -> attached;
_ -> attach_refused
end,
{LS, unpack(IDC), MSS}
end,
Link = Link0#link{state = attached,
Link = Link0#link{state = LinkState,
input_handle = InHandle,
delivery_count = DeliveryCount,
max_message_size = MaxMessageSize},
Expand Down Expand Up @@ -495,50 +505,41 @@ mapped({call, From},
when Window =< 0 ->
{keep_state_and_data, {reply, From, {error, remote_incoming_window_exceeded}}};
mapped({call, From = {Pid, _}},
{transfer, #'v1_0.transfer'{handle = {uint, OutHandle},
delivery_tag = {binary, DeliveryTag},
settled = false} = Transfer0, Sections},
#state{outgoing_delivery_id = DeliveryId, links = Links,
outgoing_unsettled = Unsettled} = State) ->
case Links of
#{OutHandle := #link{input_handle = undefined}} ->
{keep_state_and_data, {reply, From, {error, half_attached}}};
#{OutHandle := #link{link_credit = LC}} when LC =< 0 ->
{keep_state_and_data, {reply, From, {error, insufficient_credit}}};
#{OutHandle := Link = #link{max_message_size = MaxMessageSize,
footer_opt = FooterOpt}} ->
Transfer = Transfer0#'v1_0.transfer'{delivery_id = uint(DeliveryId)},
case send_transfer(Transfer, Sections, FooterOpt, MaxMessageSize, State) of
{ok, NumFrames} ->
State1 = State#state{outgoing_unsettled = Unsettled#{DeliveryId => {DeliveryTag, Pid}}},
{keep_state, book_transfer_send(NumFrames, Link, State1), {reply, From, ok}};
Error ->
{keep_state_and_data, {reply, From, Error}}
end;
_ ->
{keep_state_and_data, {reply, From, {error, link_not_found}}}

end;
mapped({call, From},
{transfer, #'v1_0.transfer'{handle = {uint, OutHandle}} = Transfer0,
Sections}, #state{outgoing_delivery_id = DeliveryId,
links = Links} = State) ->
{transfer,
#'v1_0.transfer'{handle = {uint, OutHandle},
delivery_tag = DeliveryTag,
settled = Settled} = Transfer0,
Sections},
#state{outgoing_delivery_id = DeliveryId,
links = Links,
outgoing_unsettled = Unsettled0} = State0) ->
case Links of
#{OutHandle := #link{state = attach_refused}} ->
{keep_state_and_data, {reply, From, {error, attach_refused}}};
#{OutHandle := #link{input_handle = undefined}} ->
{keep_state_and_data, {reply, From, {error, half_attached}}};
#{OutHandle := #link{link_credit = LC}} when LC =< 0 ->
{keep_state_and_data, {reply, From, {error, insufficient_credit}}};
#{OutHandle := Link = #link{max_message_size = MaxMessageSize,
footer_opt = FooterOpt}} ->
Transfer = Transfer0#'v1_0.transfer'{delivery_id = uint(DeliveryId)},
case send_transfer(Transfer, Sections, FooterOpt, MaxMessageSize, State) of
case send_transfer(Transfer, Sections, FooterOpt, MaxMessageSize, State0) of
{ok, NumFrames} ->
State = case Settled of
true ->
State0;
false ->
{binary, Tag} = DeliveryTag,
Unsettled = Unsettled0#{DeliveryId => {Tag, Pid}},
State0#state{outgoing_unsettled = Unsettled}
end,
{keep_state, book_transfer_send(NumFrames, Link, State), {reply, From, ok}};
Error ->
{keep_state_and_data, {reply, From, Error}}
end;
_ ->
{keep_state_and_data, {reply, From, {error, link_not_found}}}

end;

mapped({call, From},
Expand Down Expand Up @@ -688,21 +689,28 @@ send_flow_link(OutHandle,
never -> never;
_ -> {RenewWhenBelow, Credit}
end,
#{OutHandle := #link{output_handle = H,
#{OutHandle := #link{state = LinkState,
output_handle = H,
role = receiver,
delivery_count = DeliveryCount,
available = Available} = Link} = Links,
Flow1 = Flow0#'v1_0.flow'{
handle = uint(H),
%% "In the event that the receiving link endpoint has not yet seen the
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
delivery_count = maybe_uint(DeliveryCount),
available = uint(Available)},
Flow = set_flow_session_fields(Flow1, State),
ok = send(Flow, State),
State#state{links = Links#{OutHandle =>
Link#link{link_credit = Credit,
auto_flow = AutoFlow}}}.
case LinkState of
attach_refused ->
%% We will receive the DETACH frame shortly.
State;
_ ->
Flow1 = Flow0#'v1_0.flow'{
handle = uint(H),
%% "In the event that the receiving link endpoint has not yet seen the
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
delivery_count = maybe_uint(DeliveryCount),
available = uint(Available)},
Flow = set_flow_session_fields(Flow1, State),
ok = send(Flow, State),
State#state{links = Links#{OutHandle =>
Link#link{link_credit = Credit,
auto_flow = AutoFlow}}}
end.

send_flow_session(State) ->
Flow = set_flow_session_fields(#'v1_0.flow'{}, State),
Expand Down
55 changes: 52 additions & 3 deletions deps/amqp10_client/test/system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ groups() ->
]},
{mock, [], [
insufficient_credit,
attach_refused,
incoming_heartbeat,
multi_transfer_without_delivery_id
]}
Expand Down Expand Up @@ -772,11 +773,13 @@ insufficient_credit(Config) ->
outgoing_window = {uint, 1000}}
]}
end,
AttachStep = fun({0 = Ch, #'v1_0.attach'{role = false,
name = Name}, <<>>}) ->
AttachStep = fun({0 = Ch, #'v1_0.attach'{name = Name,
role = false,
target = Target}, <<>>}) ->
{Ch, [#'v1_0.attach'{name = Name,
handle = {uint, 99},
role = true}]}
role = true,
target = Target}]}
end,
Steps = [fun mock_server:recv_amqp_header_step/1,
fun mock_server:send_amqp_header_step/1,
Expand All @@ -799,6 +802,52 @@ insufficient_credit(Config) ->
ok = amqp10_client:close_connection(Connection),
ok.

attach_refused(Config) ->
Hostname = ?config(mock_host, Config),
Port = ?config(mock_port, Config),
OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) ->
{Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]}
end,
BeginStep = fun({0 = Ch, #'v1_0.begin'{}, _Pay}) ->
{Ch, [#'v1_0.begin'{remote_channel = {ushort, Ch},
next_outgoing_id = {uint, 1},
incoming_window = {uint, 1000},
outgoing_window = {uint, 1000}}
]}
end,
AttachStep = fun({0 = Ch, #'v1_0.attach'{name = Name,
role = false}, <<>>}) ->
%% We test only the 1st stage of link refusal:
%% Server replies with its local terminus set to null.
%% We omit the 2nd stage (the detach frame).
{Ch, [#'v1_0.attach'{name = Name,
handle = {uint, 99},
role = true,
target = undefined}]}
end,
Steps = [fun mock_server:recv_amqp_header_step/1,
fun mock_server:send_amqp_header_step/1,
mock_server:amqp_step(OpenStep),
mock_server:amqp_step(BeginStep),
mock_server:amqp_step(AttachStep)],

ok = mock_server:set_steps(?config(mock_server, Config), Steps),

Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()},
{ok, Connection} = amqp10_client:open_connection(Cfg),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"mock1-sender">>,
<<"test">>),
await_link(Sender, attached, attached_timeout),
Msg = amqp10_msg:new(<<"mock-tag">>, <<"banana">>, true),
%% We expect that the lib prevents the app from sending messages
%% in this intermediate link refusal state.
?assertEqual({error, attach_refused},
amqp10_client:send_msg(Sender, Msg)),

ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).

multi_transfer_without_delivery_id(Config) ->
Hostname = ?config(mock_host, Config),
Port = ?config(mock_port, Config),
Expand Down
Loading
Loading