Skip to content

Commit 8360eae

Browse files
committed
Refactor rabbit_channel from gen_server2 to gen_server
1 parent 94baa7c commit 8360eae

File tree

1 file changed

+24
-21
lines changed

1 file changed

+24
-21
lines changed

deps/rabbit/src/rabbit_channel.erl

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242

4343
-include("amqqueue.hrl").
4444

45-
-behaviour(gen_server2).
45+
-behaviour(gen_server).
4646

4747
-export([start_link/11, start_link/12, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
4848
-export([send_command/2]).
@@ -217,6 +217,8 @@
217217
put({Type, Key}, none)
218218
end).
219219

220+
-define(HIBERNATE_AFTER, 6_000).
221+
220222
%%----------------------------------------------------------------------------
221223

222224
-export_type([channel_number/0]).
@@ -254,9 +256,10 @@ start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
254256

255257
start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
256258
VHost, Capabilities, CollectorPid, Limiter, AmqpParams) ->
257-
gen_server2:start_link(
259+
Opts = [{hibernate_after, ?HIBERNATE_AFTER}],
260+
gen_server:start_link(
258261
?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol,
259-
User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], []).
262+
User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], Opts).
260263

261264
-spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
262265

@@ -282,17 +285,17 @@ do_flow(Pid, Method, Content) ->
282285
-spec flush(pid()) -> 'ok'.
283286

284287
flush(Pid) ->
285-
gen_server2:call(Pid, flush, infinity).
288+
gen_server:call(Pid, flush, infinity).
286289

287290
-spec shutdown(pid()) -> 'ok'.
288291

289292
shutdown(Pid) ->
290-
gen_server2:cast(Pid, terminate).
293+
gen_server:cast(Pid, terminate).
291294

292295
-spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
293296

294297
send_command(Pid, Msg) ->
295-
gen_server2:cast(Pid, {command, Msg}).
298+
gen_server:cast(Pid, {command, Msg}).
296299

297300

298301
-spec deliver_reply(binary(), mc:state()) -> 'ok'.
@@ -324,7 +327,7 @@ deliver_reply_v1(EncodedBin, Message) ->
324327

325328
deliver_reply_local(Pid, Key, Message) ->
326329
case pg_local:in_group(rabbit_channels, Pid) of
327-
true -> gen_server2:cast(Pid, {deliver_reply, Key, Message});
330+
true -> gen_server:cast(Pid, {deliver_reply, Key, Message});
328331
false -> ok
329332
end.
330333

@@ -338,7 +341,7 @@ declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) ->
338341
Msg = {declare_fast_reply_to, Key},
339342
rabbit_misc:with_exit_handler(
340343
rabbit_misc:const(not_found),
341-
fun() -> gen_server2:call(Pid, Msg, infinity) end)
344+
fun() -> gen_server:call(Pid, Msg, infinity) end)
342345
end;
343346
declare_fast_reply_to(_) ->
344347
not_found.
@@ -350,7 +353,7 @@ declare_fast_reply_to_v1(EncodedBin) ->
350353
Msg = {declare_fast_reply_to, V1Key},
351354
rabbit_misc:with_exit_handler(
352355
rabbit_misc:const(not_found),
353-
fun() -> gen_server2:call(V1Pid, Msg, infinity) end);
356+
fun() -> gen_server:call(V1Pid, Msg, infinity) end);
354357
{error, _} ->
355358
not_found
356359
end.
@@ -375,7 +378,7 @@ info_keys() -> ?INFO_KEYS.
375378
info(Pid) ->
376379
{Timeout, Deadline} = get_operation_timeout_and_deadline(),
377380
try
378-
case gen_server2:call(Pid, {info, Deadline}, Timeout) of
381+
case gen_server:call(Pid, {info, Deadline}, Timeout) of
379382
{ok, Res} -> Res;
380383
{error, Error} -> throw(Error)
381384
end
@@ -390,7 +393,7 @@ info(Pid) ->
390393
info(Pid, Items) ->
391394
{Timeout, Deadline} = get_operation_timeout_and_deadline(),
392395
try
393-
case gen_server2:call(Pid, {{info, Items}, Deadline}, Timeout) of
396+
case gen_server:call(Pid, {{info, Items}, Deadline}, Timeout) of
394397
{ok, Res} -> Res;
395398
{error, Error} -> throw(Error)
396399
end
@@ -430,7 +433,7 @@ refresh_config_local() ->
430433
_ = rabbit_misc:upmap(
431434
fun (C) ->
432435
try
433-
gen_server2:call(C, refresh_config, infinity)
436+
gen_server:call(C, refresh_config, infinity)
434437
catch _:Reason ->
435438
rabbit_log:error("Failed to refresh channel config "
436439
"for channel ~tp. Reason ~tp",
@@ -444,7 +447,7 @@ refresh_interceptors() ->
444447
_ = rabbit_misc:upmap(
445448
fun (C) ->
446449
try
447-
gen_server2:call(C, refresh_interceptors, ?REFRESH_TIMEOUT)
450+
gen_server:call(C, refresh_interceptors, ?REFRESH_TIMEOUT)
448451
catch _:Reason ->
449452
rabbit_log:error("Failed to refresh channel interceptors "
450453
"for channel ~tp. Reason ~tp",
@@ -465,11 +468,11 @@ ready_for_close(Pid) ->
465468
% This event is necessary for the stats timer to be initialized with
466469
% the correct values once the management agent has started
467470
force_event_refresh(Ref) ->
468-
[gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()],
471+
[gen_server:cast(C, {force_event_refresh, Ref}) || C <- list()],
469472
ok.
470473

471474
list_queue_states(Pid) ->
472-
gen_server2:call(Pid, list_queue_states).
475+
gen_server:call(Pid, list_queue_states).
473476

474477
-spec update_user_state(pid(), rabbit_types:auth_user()) -> 'ok' | {error, channel_terminated}.
475478

@@ -485,6 +488,7 @@ update_user_state(Pid, UserState) when is_pid(Pid) ->
485488
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
486489
Capabilities, CollectorPid, LimiterPid, AmqpParams]) ->
487490
process_flag(trap_exit, true),
491+
process_flag(message_queue_data, off_heap),
488492
?LG_PROCESS_TYPE(channel),
489493
?store_proc_name({ConnName, Channel}),
490494
ok = pg_local:join(rabbit_channels, self()),
@@ -549,8 +553,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
549553
fun() -> emit_stats(State2) end),
550554
put_operation_timeout(),
551555
State3 = init_tick_timer(State2),
552-
{ok, State3, hibernate,
553-
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
556+
{ok, State3}.
554557

555558
prioritise_call(Msg, _From, _Len, _State) ->
556559
case Msg of
@@ -726,7 +729,7 @@ handle_info(emit_stats, State) ->
726729
State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer),
727730
%% NB: don't call noreply/1 since we don't want to kick off the
728731
%% stats timer.
729-
{noreply, send_confirms_and_nacks(State1), hibernate};
732+
{noreply, send_confirms_and_nacks(State1)};
730733

731734
handle_info({{'DOWN', QName}, _MRef, process, QPid, Reason},
732735
#ch{queue_states = QStates0} = State0) ->
@@ -821,14 +824,14 @@ get_consumer_timeout() ->
821824
end.
822825
%%---------------------------------------------------------------------------
823826

824-
reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
827+
reply(Reply, NewState) -> {reply, Reply, next_state(NewState)}.
825828

826-
noreply(NewState) -> {noreply, next_state(NewState), hibernate}.
829+
noreply(NewState) -> {noreply, next_state(NewState)}.
827830

828831
next_state(State) -> ensure_stats_timer(send_confirms_and_nacks(State)).
829832

830833
noreply_coalesce(#ch{confirmed = [], rejected = []} = State) ->
831-
{noreply, ensure_stats_timer(State), hibernate};
834+
{noreply, ensure_stats_timer(State)};
832835
noreply_coalesce(#ch{} = State) ->
833836
% Immediately process 'timeout' info message
834837
{noreply, ensure_stats_timer(State), 0}.

0 commit comments

Comments
 (0)