Skip to content

Commit b2db635

Browse files
committed
Support handle-max
## What? 1. Support `handle-max` field in the AMQP 1.0 `begin` frame 2. Add a new setting `link_max_per_session` which defaults to 256. 3. Rename `session_max` to `session_max_per_connection` ## Why? 1. Operators might want to limit the number of links per session. A similar setting `consumer_max_per_channel` exists for AMQP 0.9.1. 2. We should use RabbitMQ 4.0 as an opportunity to set a sensible default as to how many links can be active on a given session simultaneously. The session code does iterate over every link in some scenarios (e.g. queue was deleted). At some point, it's better to just open 2nd session instead of attaching hundreds or thousands of links to a single session. A default `link_max_per_session` of 256 should be more than enough given that `session_max_per_connection` is 64. So, the defaults allow `256 * 64 = 16,384` links to be active on an AMQP 1.0 connection. (Operators might want to lower both defaults.) 3. The name is clearer given that we might introduce `session_max_per_node` in the future since `channel_max_per_node` exists for AMQP 0.9.1. ### Additional Context > Link handles MAY be reused once a link is closed for both send and receive. > To make it easier to monitor AMQP link attach frames, it is RECOMMENDED that > implementations always assign the lowest available handle to this field.
1 parent 1fb2206 commit b2db635

File tree

8 files changed

+382
-316
lines changed

8 files changed

+382
-316
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ _APP_ENV = """[
4343
{frame_max, 131072},
4444
%% see rabbitmq-server#1593
4545
{channel_max, 2047},
46-
{session_max, 64},
46+
{session_max_per_connection, 64},
47+
{link_max_per_session, 256},
4748
{ranch_connection_max, infinity},
4849
{heartbeat, 60},
4950
{msg_store_file_size_limit, 16777216},

deps/rabbit/Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ define PROJECT_ENV
2323
{frame_max, 131072},
2424
%% see rabbitmq-server#1593
2525
{channel_max, 2047},
26-
{session_max, 64},
26+
{session_max_per_connection, 64},
27+
{link_max_per_session, 256},
2728
{ranch_connection_max, infinity},
2829
{heartbeat, 60},
2930
{msg_store_file_size_limit, 16777216},

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -951,9 +951,16 @@ end}.
951951
%% Sets the maximum number of AMQP 1.0 sessions that can be simultaneously
952952
%% active on an AMQP 1.0 connection.
953953
%%
954-
%% {session_max, 1},
955-
{mapping, "session_max", "rabbit.session_max",
956-
[{datatype, integer}, {validators, ["positive_16_bit_integer"]}]}.
954+
%% {session_max_per_connection, 1},
955+
{mapping, "session_max_per_connection", "rabbit.session_max_per_connection",
956+
[{datatype, integer}, {validators, ["positive_16_bit_unsigned_integer"]}]}.
957+
958+
%% Sets the maximum number of AMQP 1.0 links that can be simultaneously
959+
%% active on an AMQP 1.0 session.
960+
%%
961+
%% {link_max_per_session, 10},
962+
{mapping, "link_max_per_session", "rabbit.link_max_per_session",
963+
[{datatype, integer}, {validators, ["positive_32_bit_unsigned_integer"]}]}.
957964

958965
%% Set the max permissible number of client connections per node.
959966
%% `infinity` means "no limit".
@@ -2436,7 +2443,7 @@ end}.
24362443

24372444
{mapping, "raft.segment_max_entries", "ra.segment_max_entries", [
24382445
{datatype, integer},
2439-
{validators, ["non_zero_positive_integer", "positive_16_bit_integer"]}
2446+
{validators, ["non_zero_positive_integer", "positive_16_bit_unsigned_integer"]}
24402447
]}.
24412448

24422449
{translation, "ra.segment_max_entries",
@@ -2743,10 +2750,15 @@ fun(Int) when is_integer(Int) ->
27432750
Int >= 1
27442751
end}.
27452752

2746-
{validator, "positive_16_bit_integer", "number should be between 1 and 65535",
2747-
fun(Int) when is_integer(Int) ->
2748-
(Int >= 1) and (Int =< 65535)
2749-
end}.
2753+
{validator, "positive_16_bit_unsigned_integer", "number should be between 1 and 65535",
2754+
fun(Int) when is_integer(Int) ->
2755+
(Int >= 1) and (Int =< 16#ff_ff)
2756+
end}.
2757+
2758+
{validator, "positive_32_bit_unsigned_integer", "number should be between 1 and 4294967295",
2759+
fun(Int) when is_integer(Int) ->
2760+
(Int >= 1) and (Int =< 16#ff_ff_ff_ff)
2761+
end}.
27502762

27512763
{validator, "valid_regex", "string must be a valid regular expression",
27522764
fun("") -> false;

deps/rabbit/src/rabbit_amqp_reader.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ handle_connection_frame(
476476
SendTimeoutSec, SendFun,
477477
ReceiveTimeoutSec, ReceiveFun),
478478
{ok, IncomingMaxFrameSize} = application:get_env(rabbit, frame_max),
479-
{ok, SessionMax} = application:get_env(rabbit, session_max),
479+
{ok, SessionMax} = application:get_env(rabbit, session_max_per_connection),
480480
%% "The channel-max value is the highest channel number that can be used on the connection.
481481
%% This value plus one is the maximum number of sessions that can be simultaneously active
482482
%% on the connection." [2.7.1]

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 296 additions & 279 deletions
Large diffs are not rendered by default.

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ groups() ->
141141
incoming_window_closed_rabbitmq_internal_flow_quorum_queue,
142142
tcp_back_pressure_rabbitmq_internal_flow_classic_queue,
143143
tcp_back_pressure_rabbitmq_internal_flow_quorum_queue,
144-
session_max
144+
session_max_per_connection,
145+
link_max_per_session
145146
]},
146147

147148
{cluster_size_3, [shuffle],
@@ -3350,17 +3351,7 @@ async_notify(SenderSettleMode, QType, Config) ->
33503351
flush(settled),
33513352
ok = detach_link_sync(Sender),
33523353

3353-
case QType of
3354-
<<"stream">> ->
3355-
%% If it is a stream we need to wait until there is a local member
3356-
%% on the node we want to subscibe from before proceeding.
3357-
rabbit_ct_helpers:await_condition(
3358-
fun() -> rpc(Config, 0, ?MODULE, has_local_member,
3359-
[rabbit_misc:r(<<"/">>, queue, QName)])
3360-
end, 30_000);
3361-
_ ->
3362-
ok
3363-
end,
3354+
ok = wait_for_local_member(QType, QName, Config),
33643355
Filter = consume_from_first(QType),
33653356
{ok, Receiver} = amqp10_client:attach_receiver_link(
33663357
Session, <<"test-receiver">>, Address,
@@ -3638,10 +3629,7 @@ leader_transfer_credit(QName, QType, Credit, Config) ->
36383629
ok = wait_for_accepts(NumMsgs),
36393630
ok = detach_link_sync(Sender),
36403631

3641-
%% Wait a bit to avoid the following error when attaching:
3642-
%% "stream queue <name> does not have a running replica on the local node"
3643-
timer:sleep(50),
3644-
3632+
ok = wait_for_local_member(QType, QName, Config),
36453633
Filter = consume_from_first(QType),
36463634
{ok, Receiver} = amqp10_client:attach_receiver_link(
36473635
Session0, <<"receiver">>, Address,
@@ -5666,15 +5654,18 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) ->
56665654
ok = end_session_sync(Session),
56675655
ok = amqp10_client:close_connection(Connection).
56685656

5669-
session_max(Config) ->
5657+
session_max_per_connection(Config) ->
56705658
App = rabbit,
5671-
Par = session_max,
5659+
Par = session_max_per_connection,
56725660
{ok, Default} = rpc(Config, application, get_env, [App, Par]),
56735661
%% Let's allow only 1 session per connection.
56745662
ok = rpc(Config, application, set_env, [App, Par, 1]),
56755663

56765664
OpnConf = connection_config(Config),
56775665
{ok, Connection} = amqp10_client:open_connection(OpnConf),
5666+
receive {amqp10_event, {connection, Connection, opened}} -> ok
5667+
after 5000 -> ct:fail(opened_timeout)
5668+
end,
56785669
%% The 1st session should succeed.
56795670
{ok, _Session1} = amqp10_client:begin_session_sync(Connection),
56805671
%% The 2nd session should fail.
@@ -5688,6 +5679,32 @@ session_max(Config) ->
56885679

56895680
ok = rpc(Config, application, set_env, [App, Par, Default]).
56905681

5682+
link_max_per_session(Config) ->
5683+
App = rabbit,
5684+
Par = link_max_per_session,
5685+
{ok, Default} = rpc(Config, application, get_env, [App, Par]),
5686+
%% Let's allow only 1 link per session.
5687+
ok = rpc(Config, application, set_env, [App, Par, 1]),
5688+
5689+
OpnConf = connection_config(Config),
5690+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
5691+
receive {amqp10_event, {connection, Connection, opened}} -> ok
5692+
after 5000 -> ct:fail(opened_timeout)
5693+
end,
5694+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
5695+
Address1 = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"k1">>),
5696+
Address2 = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"k2">>),
5697+
%% The 1st link should succeed.
5698+
{ok, Link1} = amqp10_client:attach_sender_link_sync(Session, <<"link-1">>, Address1),
5699+
ok = wait_for_credit(Link1),
5700+
%% Since the 2nd link should fail, we expect our session process to die.
5701+
?assert(is_process_alive(Session)),
5702+
{ok, _Link2} = amqp10_client:attach_sender_link(Session, <<"link-2">>, Address2),
5703+
eventually(?_assertNot(is_process_alive(Session))),
5704+
5705+
flush(test_succeeded),
5706+
ok = rpc(Config, application, set_env, [App, Par, Default]).
5707+
56915708
%% internal
56925709
%%
56935710

@@ -5985,6 +6002,16 @@ ready_messages(QName, Config)
59856002
ra_name(Q) ->
59866003
binary_to_atom(<<"%2F_", Q/binary>>).
59876004

6005+
wait_for_local_member(<<"stream">>, QName, Config) ->
6006+
%% If it is a stream we need to wait until there is a local member
6007+
%% on the node we want to subscribe from before proceeding.
6008+
rabbit_ct_helpers:await_condition(
6009+
fun() -> rpc(Config, 0, ?MODULE, has_local_member,
6010+
[rabbit_misc:r(<<"/">>, queue, QName)])
6011+
end, 30_000);
6012+
wait_for_local_member(_, _, _) ->
6013+
ok.
6014+
59886015
has_local_member(QName) ->
59896016
case rabbit_amqqueue:lookup(QName) of
59906017
{ok, Q} ->

deps/rabbit/test/amqp_credit_api_v2_SUITE.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,15 +97,15 @@ credit_api_v2(Config) ->
9797
ok = amqp10_client:detach_link(QQSender),
9898

9999
%% Consume with credit API v1
100-
CQAttachArgs = #{handle => 300,
100+
CQAttachArgs = #{handle => 100,
101101
name => <<"cq receiver 1">>,
102102
role => {receiver, #{address => CQAddr,
103103
durable => configuration}, self()},
104104
snd_settle_mode => unsettled,
105105
rcv_settle_mode => first,
106106
filter => #{}},
107107
{ok, CQReceiver1} = amqp10_client:attach_link(Session, CQAttachArgs),
108-
QQAttachArgs = #{handle => 400,
108+
QQAttachArgs = #{handle => 200,
109109
name => <<"qq receiver 1">>,
110110
role => {receiver, #{address => QQAddr,
111111
durable => configuration}, self()},

deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -429,13 +429,21 @@ tcp_listen_options.exit_on_close = false",
429429
"channel_max_per_node = infinity",
430430
[{rabbit,[{channel_max_per_node, infinity}]}],
431431
[]},
432-
{session_max_1,
433-
"session_max = 1",
434-
[{rabbit,[{session_max, 1}]}],
432+
{session_max_per_connection_1,
433+
"session_max_per_connection = 1",
434+
[{rabbit,[{session_max_per_connection, 1}]}],
435435
[]},
436-
{session_max,
437-
"session_max = 65000",
438-
[{rabbit,[{session_max, 65000}]}],
436+
{session_max_per_connection,
437+
"session_max_per_connection = 65000",
438+
[{rabbit,[{session_max_per_connection, 65_000}]}],
439+
[]},
440+
{link_max_per_session_1,
441+
"link_max_per_session = 1",
442+
[{rabbit,[{link_max_per_session, 1}]}],
443+
[]},
444+
{link_max_per_session,
445+
"link_max_per_session = 4200000000",
446+
[{rabbit,[{link_max_per_session, 4_200_000_000}]}],
439447
[]},
440448
{consumer_max_per_channel,
441449
"consumer_max_per_channel = 16",

0 commit comments

Comments
 (0)