Skip to content

Commit ce969f6

Browse files
committed
WIP Refuse link
## What? Refuse or detach the link instead of ending the session. ## Why? Because many errors are scoped to a single terminus, detaching just that link preserves the rest of the session’s links - avoiding needless disruption of other traffic. AMQP 1.0’s error model is hierarchical; RabbitMQ should escalate to ending the session only for session-level faults or if the client keeps using a destroyed link. ## How? Refuse link as per figure 2.33 TODO: * Convert more session-level errors to link-level errors
1 parent e329881 commit ce969f6

File tree

8 files changed

+391
-171
lines changed

8 files changed

+391
-171
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 55 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@
127127
-record(link,
128128
{name :: link_name(),
129129
ref :: link_ref(),
130-
state = detached :: detached | attach_sent | attached | detach_sent,
130+
state = detached :: detached | attach_sent | attached | attach_refused | detach_sent,
131131
notify :: pid(),
132132
output_handle :: output_handle(),
133133
input_handle :: input_handle() | undefined,
@@ -325,9 +325,11 @@ mapped(cast, #'v1_0.end'{} = End, State) ->
325325
ok = notify_session_ended(End, State),
326326
{stop, normal, State};
327327
mapped(cast, #'v1_0.attach'{name = {utf8, Name},
328-
initial_delivery_count = IDC,
329328
handle = {uint, InHandle},
330329
role = PeerRoleBool,
330+
source = Source,
331+
target = Target,
332+
initial_delivery_count = IDC,
331333
max_message_size = MaybeMaxMessageSize} = Attach,
332334
#state{links = Links, link_index = LinkIndex,
333335
link_handle_index = LHI} = State0) ->
@@ -339,20 +341,28 @@ mapped(cast, #'v1_0.attach'{name = {utf8, Name},
339341
#{OutHandle := Link0} = Links,
340342
ok = notify_link_attached(Link0, Attach, State0),
341343

342-
{DeliveryCount, MaxMessageSize} =
344+
{LinkState, DeliveryCount, MaxMessageSize} =
343345
case Link0 of
344346
#link{role = sender = OurRole,
345347
delivery_count = DC} ->
348+
LS = case Target of
349+
#'v1_0.target'{} -> attached;
350+
_ -> attach_refused
351+
end,
346352
MSS = case MaybeMaxMessageSize of
347353
{ulong, S} when S > 0 -> S;
348354
_ -> undefined
349355
end,
350-
{DC, MSS};
356+
{LS, DC, MSS};
351357
#link{role = receiver = OurRole,
352358
max_message_size = MSS} ->
353-
{unpack(IDC), MSS}
359+
LS = case Source of
360+
#'v1_0.source'{} -> attached;
361+
_ -> attach_refused
362+
end,
363+
{LS, unpack(IDC), MSS}
354364
end,
355-
Link = Link0#link{state = attached,
365+
Link = Link0#link{state = LinkState,
356366
input_handle = InHandle,
357367
delivery_count = DeliveryCount,
358368
max_message_size = MaxMessageSize},
@@ -495,50 +505,41 @@ mapped({call, From},
495505
when Window =< 0 ->
496506
{keep_state_and_data, {reply, From, {error, remote_incoming_window_exceeded}}};
497507
mapped({call, From = {Pid, _}},
498-
{transfer, #'v1_0.transfer'{handle = {uint, OutHandle},
499-
delivery_tag = {binary, DeliveryTag},
500-
settled = false} = Transfer0, Sections},
501-
#state{outgoing_delivery_id = DeliveryId, links = Links,
502-
outgoing_unsettled = Unsettled} = State) ->
503-
case Links of
504-
#{OutHandle := #link{input_handle = undefined}} ->
505-
{keep_state_and_data, {reply, From, {error, half_attached}}};
506-
#{OutHandle := #link{link_credit = LC}} when LC =< 0 ->
507-
{keep_state_and_data, {reply, From, {error, insufficient_credit}}};
508-
#{OutHandle := Link = #link{max_message_size = MaxMessageSize,
509-
footer_opt = FooterOpt}} ->
510-
Transfer = Transfer0#'v1_0.transfer'{delivery_id = uint(DeliveryId)},
511-
case send_transfer(Transfer, Sections, FooterOpt, MaxMessageSize, State) of
512-
{ok, NumFrames} ->
513-
State1 = State#state{outgoing_unsettled = Unsettled#{DeliveryId => {DeliveryTag, Pid}}},
514-
{keep_state, book_transfer_send(NumFrames, Link, State1), {reply, From, ok}};
515-
Error ->
516-
{keep_state_and_data, {reply, From, Error}}
517-
end;
518-
_ ->
519-
{keep_state_and_data, {reply, From, {error, link_not_found}}}
520-
521-
end;
522-
mapped({call, From},
523-
{transfer, #'v1_0.transfer'{handle = {uint, OutHandle}} = Transfer0,
524-
Sections}, #state{outgoing_delivery_id = DeliveryId,
525-
links = Links} = State) ->
508+
{transfer,
509+
#'v1_0.transfer'{handle = {uint, OutHandle},
510+
delivery_tag = DeliveryTag,
511+
settled = Settled} = Transfer0,
512+
Sections},
513+
#state{outgoing_delivery_id = DeliveryId,
514+
links = Links,
515+
outgoing_unsettled = Unsettled0} = State0) ->
526516
case Links of
517+
#{OutHandle := #link{state = attach_refused}} ->
518+
{keep_state_and_data, {reply, From, {error, attach_refused}}};
527519
#{OutHandle := #link{input_handle = undefined}} ->
528520
{keep_state_and_data, {reply, From, {error, half_attached}}};
529521
#{OutHandle := #link{link_credit = LC}} when LC =< 0 ->
530522
{keep_state_and_data, {reply, From, {error, insufficient_credit}}};
531523
#{OutHandle := Link = #link{max_message_size = MaxMessageSize,
532524
footer_opt = FooterOpt}} ->
533525
Transfer = Transfer0#'v1_0.transfer'{delivery_id = uint(DeliveryId)},
534-
case send_transfer(Transfer, Sections, FooterOpt, MaxMessageSize, State) of
526+
case send_transfer(Transfer, Sections, FooterOpt, MaxMessageSize, State0) of
535527
{ok, NumFrames} ->
528+
State = case Settled of
529+
true ->
530+
State0;
531+
false ->
532+
{binary, Tag} = DeliveryTag,
533+
Unsettled = Unsettled0#{DeliveryId => {Tag, Pid}},
534+
State0#state{outgoing_unsettled = Unsettled}
535+
end,
536536
{keep_state, book_transfer_send(NumFrames, Link, State), {reply, From, ok}};
537537
Error ->
538538
{keep_state_and_data, {reply, From, Error}}
539539
end;
540540
_ ->
541541
{keep_state_and_data, {reply, From, {error, link_not_found}}}
542+
542543
end;
543544

544545
mapped({call, From},
@@ -688,21 +689,28 @@ send_flow_link(OutHandle,
688689
never -> never;
689690
_ -> {RenewWhenBelow, Credit}
690691
end,
691-
#{OutHandle := #link{output_handle = H,
692+
#{OutHandle := #link{state = LinkState,
693+
output_handle = H,
692694
role = receiver,
693695
delivery_count = DeliveryCount,
694696
available = Available} = Link} = Links,
695-
Flow1 = Flow0#'v1_0.flow'{
696-
handle = uint(H),
697-
%% "In the event that the receiving link endpoint has not yet seen the
698-
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
699-
delivery_count = maybe_uint(DeliveryCount),
700-
available = uint(Available)},
701-
Flow = set_flow_session_fields(Flow1, State),
702-
ok = send(Flow, State),
703-
State#state{links = Links#{OutHandle =>
704-
Link#link{link_credit = Credit,
705-
auto_flow = AutoFlow}}}.
697+
case LinkState of
698+
attach_refused ->
699+
%% We will receive the DETACH frame shortly.
700+
State;
701+
_ ->
702+
Flow1 = Flow0#'v1_0.flow'{
703+
handle = uint(H),
704+
%% "In the event that the receiving link endpoint has not yet seen the
705+
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
706+
delivery_count = maybe_uint(DeliveryCount),
707+
available = uint(Available)},
708+
Flow = set_flow_session_fields(Flow1, State),
709+
ok = send(Flow, State),
710+
State#state{links = Links#{OutHandle =>
711+
Link#link{link_credit = Credit,
712+
auto_flow = AutoFlow}}}
713+
end.
706714

707715
send_flow_session(State) ->
708716
Flow = set_flow_session_fields(#'v1_0.flow'{}, State),

deps/amqp10_client/test/system_SUITE.erl

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ groups() ->
5252
]},
5353
{mock, [], [
5454
insufficient_credit,
55+
attach_refused,
5556
incoming_heartbeat,
5657
multi_transfer_without_delivery_id
5758
]}
@@ -772,11 +773,13 @@ insufficient_credit(Config) ->
772773
outgoing_window = {uint, 1000}}
773774
]}
774775
end,
775-
AttachStep = fun({0 = Ch, #'v1_0.attach'{role = false,
776-
name = Name}, <<>>}) ->
776+
AttachStep = fun({0 = Ch, #'v1_0.attach'{name = Name,
777+
role = false,
778+
target = Target}, <<>>}) ->
777779
{Ch, [#'v1_0.attach'{name = Name,
778780
handle = {uint, 99},
779-
role = true}]}
781+
role = true,
782+
target = Target}]}
780783
end,
781784
Steps = [fun mock_server:recv_amqp_header_step/1,
782785
fun mock_server:send_amqp_header_step/1,
@@ -799,6 +802,52 @@ insufficient_credit(Config) ->
799802
ok = amqp10_client:close_connection(Connection),
800803
ok.
801804

805+
attach_refused(Config) ->
806+
Hostname = ?config(mock_host, Config),
807+
Port = ?config(mock_port, Config),
808+
OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) ->
809+
{Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]}
810+
end,
811+
BeginStep = fun({0 = Ch, #'v1_0.begin'{}, _Pay}) ->
812+
{Ch, [#'v1_0.begin'{remote_channel = {ushort, Ch},
813+
next_outgoing_id = {uint, 1},
814+
incoming_window = {uint, 1000},
815+
outgoing_window = {uint, 1000}}
816+
]}
817+
end,
818+
AttachStep = fun({0 = Ch, #'v1_0.attach'{name = Name,
819+
role = false}, <<>>}) ->
820+
%% We test only the 1st stage of link refusal:
821+
%% Server replies with its local terminus set to null.
822+
%% We omit the 2nd stage (the detach frame).
823+
{Ch, [#'v1_0.attach'{name = Name,
824+
handle = {uint, 99},
825+
role = true,
826+
target = undefined}]}
827+
end,
828+
Steps = [fun mock_server:recv_amqp_header_step/1,
829+
fun mock_server:send_amqp_header_step/1,
830+
mock_server:amqp_step(OpenStep),
831+
mock_server:amqp_step(BeginStep),
832+
mock_server:amqp_step(AttachStep)],
833+
834+
ok = mock_server:set_steps(?config(mock_server, Config), Steps),
835+
836+
Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()},
837+
{ok, Connection} = amqp10_client:open_connection(Cfg),
838+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
839+
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"mock1-sender">>,
840+
<<"test">>),
841+
await_link(Sender, attached, attached_timeout),
842+
Msg = amqp10_msg:new(<<"mock-tag">>, <<"banana">>, true),
843+
%% We expect that the lib prevents the app from sending messages
844+
%% in this intermediate link refusal state.
845+
?assertEqual({error, attach_refused},
846+
amqp10_client:send_msg(Sender, Msg)),
847+
848+
ok = amqp10_client:end_session(Session),
849+
ok = amqp10_client:close_connection(Connection).
850+
802851
multi_transfer_without_delivery_id(Config) ->
803852
Hostname = ?config(mock_host, Config),
804853
Port = ?config(mock_port, Config),

0 commit comments

Comments
 (0)