summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/gm.erl29
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl30
-rw-r--r--src/rabbit_channel.erl95
-rw-r--r--src/rabbit_limiter.erl16
-rw-r--r--src/rabbit_looking_glass.erl20
-rw-r--r--src/rabbit_mirror_queue_slave.erl3
-rw-r--r--src/rabbit_msg_store.erl24
-rw-r--r--src/rabbit_node_monitor.erl12
-rw-r--r--src/rabbit_priority_queue.erl34
-rw-r--r--src/rabbit_queue_consumers.erl6
-rw-r--r--src/rabbit_queue_location_validator.erl5
-rw-r--r--src/rabbit_variable_queue.erl94
-rw-r--r--src/rabbit_version.erl6
-rw-r--r--src/rabbit_vm.erl57
-rw-r--r--src/tcp_listener_sup.erl3
16 files changed, 271 insertions, 171 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 0da190a57d..c53e9bb007 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -399,7 +399,6 @@
-define(FORCE_GC_TIMER, 250).
-define(VERSION_START, 0).
-define(SETS, ordsets).
--define(DICT, orddict).
-record(state,
{ self,
@@ -824,8 +823,8 @@ handle_msg({catchup, Left, MembersStateLeft},
members_state = MembersState })
when MembersState =/= undefined ->
MembersStateLeft1 = build_members_state(MembersStateLeft),
- AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++
- ?DICT:fetch_keys(MembersStateLeft1)),
+ AllMembers = lists:usort(maps:keys(MembersState) ++
+ maps:keys(MembersStateLeft1)),
{MembersState1, Activity} =
lists:foldl(
fun (Id, MembersStateActivity) ->
@@ -995,21 +994,21 @@ is_member_alias(Member, Self, View) ->
dead_member_id({dead, Member}) -> Member.
store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
- {Ver, ?DICT:store(Id, VMember, View)}.
+ {Ver, maps:put(Id, VMember, View)}.
with_view_member(Fun, View, Id) ->
store_view_member(Fun(fetch_view_member(Id, View)), View).
-fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View).
+fetch_view_member(Id, {_Ver, View}) -> maps:get(Id, View).
-find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View).
+find_view_member(Id, {_Ver, View}) -> maps:find(Id, View).
-blank_view(Ver) -> {Ver, ?DICT:new()}.
+blank_view(Ver) -> {Ver, maps:new()}.
-alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View).
+alive_view_members({_Ver, View}) -> maps:keys(View).
all_known_members({_Ver, View}) ->
- ?DICT:fold(
+ maps:fold(
fun (Member, #view_member { aliases = Aliases }, Acc) ->
?SETS:to_list(Aliases) ++ [Member | Acc]
end, [], View).
@@ -1374,24 +1373,24 @@ with_member_acc(Fun, Id, {MembersState, Acc}) ->
{store_member(Id, MemberState, MembersState), Acc1}.
find_member_or_blank(Id, MembersState) ->
- case ?DICT:find(Id, MembersState) of
+ case maps:find(Id, MembersState) of
{ok, Result} -> Result;
error -> blank_member()
end.
-erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState).
+erase_member(Id, MembersState) -> maps:remove(Id, MembersState).
blank_member() ->
#member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
-blank_member_state() -> ?DICT:new().
+blank_member_state() -> maps:new().
store_member(Id, MemberState, MembersState) ->
- ?DICT:store(Id, MemberState, MembersState).
+ maps:put(Id, MemberState, MembersState).
-prepare_members_state(MembersState) -> ?DICT:to_list(MembersState).
+prepare_members_state(MembersState) -> maps:to_list(MembersState).
-build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList).
+build_members_state(MembersStateList) -> maps:from_list(MembersStateList).
make_member(GroupName) ->
{case dirty_read_group(GroupName) of
diff --git a/src/rabbit.erl b/src/rabbit.erl
index cc1e0e08c4..0d0ff2f9fc 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -23,6 +23,7 @@
status/0, is_running/0, alarms/0,
is_running/1, environment/0, rotate_logs/0, force_event_refresh/1,
start_fhc/0]).
+
-export([start/2, stop/1, prep_stop/1]).
-export([start_apps/1, start_apps/2, stop_apps/1]).
-export([log_locations/0, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent
@@ -230,6 +231,7 @@
%%----------------------------------------------------------------------------
+-type restart_type() :: 'permanent' | 'transient' | 'temporary'.
%% this really should be an abstract type
-type log_location() :: string().
-type param() :: atom().
@@ -267,7 +269,7 @@
-spec recover() -> 'ok'.
-spec start_apps([app_name()]) -> 'ok'.
-spec start_apps([app_name()],
- #{app_name() => permanent|transient|temporary}) -> 'ok'.
+ #{app_name() => restart_type()}) -> 'ok'.
-spec stop_apps([app_name()]) -> 'ok'.
%%----------------------------------------------------------------------------
@@ -506,7 +508,7 @@ stop_and_halt() ->
start_apps(Apps) ->
start_apps(Apps, #{}).
-start_apps(Apps, AppModes) ->
+start_apps(Apps, RestartTypes) ->
app_utils:load_applications(Apps),
ConfigEntryDecoder = case application:get_env(rabbit, config_entry_decoder) of
@@ -547,7 +549,7 @@ start_apps(Apps, AppModes) ->
end,
ok = app_utils:start_applications(OrderedApps,
handle_app_error(could_not_start),
- AppModes).
+ RestartTypes).
%% This function retrieves the correct IoDevice for requesting
%% input. The problem with using the default IoDevice is that
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ed88e69378..b51375f9fb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -22,6 +22,7 @@
-define(SYNC_INTERVAL, 200). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(CONSUMER_BIAS_RATIO, 2.0). %% i.e. consume 100% faster
-export([info_keys/0]).
@@ -1072,27 +1073,27 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName, ActingUser) ->
%%----------------------------------------------------------------------------
-prioritise_call(Msg, _From, _Len, _State) ->
+prioritise_call(Msg, _From, _Len, State) ->
case Msg of
info -> 9;
{info, _Items} -> 9;
consumers -> 9;
stat -> 7;
- {basic_consume, _, _, _, _, _, _, _, _, _, _} -> 1;
- {basic_cancel, _, _, _} -> 1;
+ {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State, 0, 2);
+ {basic_cancel, _, _, _} -> consumer_bias(State, 0, 2);
_ -> 0
end.
-prioritise_cast(Msg, _Len, _State) ->
+prioritise_cast(Msg, _Len, State) ->
case Msg of
delete_immediately -> 8;
{delete_exclusive, _Pid} -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
- {ack, _AckTags, _ChPid} -> 3; %% [1]
- {resume, _ChPid} -> 2;
- {notify_sent, _ChPid, _Credit} -> 1;
+ {ack, _AckTags, _ChPid} -> 4; %% [1]
+ {resume, _ChPid} -> 3;
+ {notify_sent, _ChPid, _Credit} -> consumer_bias(State, 0, 2);
_ -> 0
end.
@@ -1104,6 +1105,16 @@ prioritise_cast(Msg, _Len, _State) ->
%% stack are optimised for that) and to make things easier to reason
%% about. Finally, we prioritise ack over resume since it should
%% always reduce memory use.
+%% bump_reduce_memory_use is prioritised over publishes, because sending
+%% credit to self is hard to reason about. Consumers can continue while
+%% reduce_memory_use is in progress.
+
+consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}, Low, High) ->
+ case BQ:msg_rates(BQS) of
+ {0.0, _} -> Low;
+ {Ingress, Egress} when Egress / Ingress < ?CONSUMER_BIAS_RATIO -> High;
+ {_, _} -> Low
+ end.
prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
case Msg of
@@ -1113,6 +1124,7 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
{drop_expired, _Version} -> 8;
emit_stats -> 7;
sync_timeout -> 6;
+ bump_reduce_memory_use -> 1;
_ -> 0
end.
@@ -1503,6 +1515,10 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
%% rabbit_variable_queue:msg_store_write/4.
credit_flow:handle_bump_msg(Msg),
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
+handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ put(waiting_bump, false),
+ noreply(State#q{backing_queue_state = BQ:resume(BQS)});
handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 1167ac66f9..c671438ce8 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -186,10 +186,21 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
--define(INCR_STATS(Incs, Measure, State),
+-define(INCR_STATS(Type, Key, Inc, Measure, State),
case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine -> incr_stats(Incs, Measure);
- _ -> ok
+ fine ->
+ rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc),
+ %% Keys in the process dictionary are used to clean up the core metrics
+ put({Type, Key}, none);
+ _ ->
+ ok
+ end).
+
+-define(INCR_STATS(Type, Key, Inc, Measure),
+ begin
+ rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc),
+ %% Keys in the process dictionary are used to clean up the core metrics
+ put({Type, Key}, none)
end).
%%----------------------------------------------------------------------------
@@ -378,7 +389,16 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
true -> flow;
false -> noflow
end,
-
+ {ok, {Global, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch),
+ Limiter0 = rabbit_limiter:new(LimiterPid),
+ Limiter = case {Global, Prefetch} of
+ {true, 0} ->
+ rabbit_limiter:unlimit_prefetch(Limiter0);
+ {true, _} ->
+ rabbit_limiter:limit_prefetch(Limiter0, Prefetch, 0);
+ _ ->
+ Limiter0
+ end,
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
@@ -386,7 +406,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
writer_pid = WriterPid,
conn_pid = ConnPid,
conn_name = ConnName,
- limiter = rabbit_limiter:new(LimiterPid),
+ limiter = Limiter,
tx = none,
next_tag = 1,
unacked_message_q = queue:new(),
@@ -407,7 +427,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
mandatory = dtree:empty(),
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost),
- consumer_prefetch = 0,
+ consumer_prefetch = Prefetch,
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined},
@@ -1258,8 +1278,12 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
"prefetch_size!=0 (~w)", [Size]);
handle_method(#'basic.qos'{global = false,
- prefetch_count = PrefetchCount}, _, State) ->
- {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount}};
+ prefetch_count = PrefetchCount},
+ _, State = #ch{limiter = Limiter}) ->
+ %% Ensures that if default was set, it's overriden
+ Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
+ {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount,
+ limiter = Limiter1}};
handle_method(#'basic.qos'{global = true,
prefetch_count = 0},
@@ -1632,7 +1656,7 @@ basic_return(#basic_message{exchange_name = ExchangeName,
content = Content},
State = #ch{protocol = Protocol, writer_pid = WriterPid},
Reason) ->
- ?INCR_STATS([{exchange_stats, ExchangeName, 1}], return_unroutable, State),
+ ?INCR_STATS(exchange_stats, ExchangeName, 1, return_unroutable, State),
{_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason),
ok = rabbit_writer:send_command(
WriterPid,
@@ -1669,14 +1693,14 @@ record_sent(ConsumerTag, AckRequired,
user = #user{username = Username},
conn_name = ConnName,
channel = ChannelNum}) ->
- ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of
- {none, true} -> get;
- {none, false} -> get_no_ack;
- {_ , true} -> deliver;
- {_ , false} -> deliver_no_ack
- end, State),
+ ?INCR_STATS(queue_stats, QName, 1, case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
case Redelivered of
- true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State);
+ true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State);
false -> ok
end,
rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
@@ -1721,11 +1745,11 @@ ack(Acked, State = #ch{queue_names = QNames}) ->
foreach_per_queue(
fun (QPid, MsgIds) ->
ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
- ?INCR_STATS(case maps:find(QPid, QNames) of
- {ok, QName} -> Count = length(MsgIds),
- [{queue_stats, QName, Count}];
- error -> []
- end, ack, State)
+ case maps:find(QPid, QNames) of
+ {ok, QName} -> Count = length(MsgIds),
+ ?INCR_STATS(queue_stats, QName, Count, ack, State);
+ error -> ok
+ end
end, Acked),
ok = notify_limiter(State#ch.limiter, Acked).
@@ -1790,7 +1814,7 @@ deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
confirm = false,
mandatory = false},
[]}, State) -> %% optimisation
- ?INCR_STATS([{exchange_stats, XName, 1}], publish, State),
+ ?INCR_STATS(exchange_stats, XName, 1, publish, State),
State;
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
@@ -1827,11 +1851,15 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
Message, State1),
State3 = process_routing_confirm( Confirm, DeliveredQPids, MsgSeqNo,
XName, State2),
- ?INCR_STATS([{exchange_stats, XName, 1} |
- [{queue_exchange_stats, {QName, XName}, 1} ||
- QPid <- DeliveredQPids,
- {ok, QName} <- [maps:find(QPid, QNames1)]]],
- publish, State3),
+ case rabbit_event:stats_level(State3, #ch.stats_timer) of
+ fine ->
+ ?INCR_STATS(exchange_stats, XName, 1, publish),
+ [?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) ||
+ QPid <- DeliveredQPids,
+ {ok, QName} <- [maps:find(QPid, QNames1)]];
+ _ ->
+ ok
+ end,
State3.
process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) ->
@@ -1874,7 +1902,7 @@ send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) ->
ok -> ConfirmMsgSeqNos =
lists:foldl(
fun ({MsgSeqNo, XName}, MSNs) ->
- ?INCR_STATS([{exchange_stats, XName, 1}],
+ ?INCR_STATS(exchange_stats, XName, 1,
confirm, State),
[MsgSeqNo | MSNs]
end, [], lists:append(C)),
@@ -1977,17 +2005,14 @@ i(Item, _) ->
name(#ch{conn_name = ConnName, channel = Channel}) ->
list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])).
-incr_stats(Incs, Measure) ->
- [begin
- rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc),
- %% Keys in the process dictionary are used to clean up the core metrics
- put({Type, Key}, none)
- end || {Type, Key, Inc} <- Incs].
-
emit_stats(State) -> emit_stats(State, []).
emit_stats(State, Extra) ->
[{reductions, Red} | Coarse0] = infos(?STATISTICS_KEYS, State),
+ %% First metric must be `idle_since` (if available), as expected by
+ %% `rabbit_mgmt_format:format_channel_stats`. This is a performance
+ %% optimisation that avoids traversing the whole list when only
+ %% one element has to be formatted.
rabbit_core_metrics:channel_stats(self(), Extra ++ Coarse0),
rabbit_core_metrics:channel_stats(reductions, self(), Red).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index e81b10a555..93bacc568d 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -184,7 +184,7 @@
%% 'Notify' is a boolean that indicates whether a queue should be
%% notified of a change in the limit or volume that may allow it to
%% deliver more messages via the limiter's channel.
- queues = orddict:new(), % QPid -> {MonitorRef, Notify}
+ queues = maps:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
%% mode is of type credit_mode()
@@ -402,28 +402,28 @@ prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
remember_queue(QPid, State = #lim{queues = Queues}) ->
- case orddict:is_key(QPid, Queues) of
+ case maps:is_key(QPid, Queues) of
false -> MRef = erlang:monitor(process, QPid),
- State#lim{queues = orddict:store(QPid, {MRef, false}, Queues)};
+ State#lim{queues = maps:put(QPid, {MRef, false}, Queues)};
true -> State
end.
forget_queue(QPid, State = #lim{queues = Queues}) ->
- case orddict:find(QPid, Queues) of
+ case maps:find(QPid, Queues) of
{ok, {MRef, _}} -> true = erlang:demonitor(MRef),
- State#lim{queues = orddict:erase(QPid, Queues)};
+ State#lim{queues = maps:remove(QPid, Queues)};
error -> State
end.
limit_queue(QPid, State = #lim{queues = Queues}) ->
UpdateFun = fun ({MRef, _}) -> {MRef, true} end,
- State#lim{queues = orddict:update(QPid, UpdateFun, Queues)}.
+ State#lim{queues = maps:update_with(QPid, UpdateFun, Queues)}.
notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
{QList, NewQueues} =
- orddict:fold(fun (_QPid, {_, false}, Acc) -> Acc;
+ maps:fold(fun (_QPid, {_, false}, Acc) -> Acc;
(QPid, {MRef, true}, {L, D}) ->
- {[QPid | L], orddict:store(QPid, {MRef, false}, D)}
+ {[QPid | L], maps:put(QPid, {MRef, false}, D)}
end, {[], Queues}, Queues),
case length(QList) of
0 -> ok;
diff --git a/src/rabbit_looking_glass.erl b/src/rabbit_looking_glass.erl
index 702fb41eb8..6cd53dbc23 100644
--- a/src/rabbit_looking_glass.erl
+++ b/src/rabbit_looking_glass.erl
@@ -17,6 +17,7 @@
-module(rabbit_looking_glass).
-ignore_xref([{lg, trace, 4}]).
+-ignore_xref([{maps, from_list, 1}]).
-export([boot/0]).
-export([connections/0]).
@@ -27,12 +28,21 @@ boot() ->
ok;
Value ->
Input = parse_value(Value),
- rabbit_log:info("Enabling Looking Glass profiler, input value: ~p", [Input]),
+ rabbit_log:info(
+ "Enabling Looking Glass profiler, input value: ~p",
+ [Input]
+ ),
{ok, _} = application:ensure_all_started(looking_glass),
- lg:trace(Input, lg_file_tracer, "traces.lz4", #{
- mode => profile,
- running => true,
- send => true})
+ lg:trace(
+ Input,
+ lg_file_tracer,
+ "traces.lz4",
+ maps:from_list([
+ {mode, profile},
+ {running, true},
+ {send, true}]
+ )
+ )
end.
parse_value(Value) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 6139099ed1..65a13f03c0 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -374,6 +374,9 @@ handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
noreply(State);
+handle_info(bump_reduce_memory_use, State) ->
+ noreply(State);
+
%% In the event of a short partition during sync we can detect the
%% master's 'death', drop out of sync, and then receive sync messages
%% which were still in flight. Ignore them.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index fe78075d0f..76976ad771 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -780,7 +780,7 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) ->
sync_timer_ref = undefined,
sum_valid_data = 0,
sum_file_size = 0,
- pending_gc_completion = orddict:new(),
+ pending_gc_completion = maps:new(),
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
@@ -1269,7 +1269,7 @@ contains_message(MsgId, From,
gen_server2:reply(From, false),
State;
#msg_location { file = File } ->
- case orddict:is_key(File, Pending) of
+ case maps:is_key(File, Pending) of
true -> add_to_pending_gc_completion(
{contains, MsgId, From}, File, State);
false -> gen_server2:reply(From, true),
@@ -1280,16 +1280,16 @@ contains_message(MsgId, From,
add_to_pending_gc_completion(
Op, File, State = #msstate { pending_gc_completion = Pending }) ->
State #msstate { pending_gc_completion =
- rabbit_misc:orddict_cons(File, Op, Pending) }.
+ rabbit_misc:maps_cons(File, Op, Pending) }.
run_pending(Files, State) ->
lists:foldl(
fun (File, State1 = #msstate { pending_gc_completion = Pending }) ->
- Pending1 = orddict:erase(File, Pending),
+ Pending1 = maps:remove(File, Pending),
lists:foldl(
fun run_pending_action/2,
State1 #msstate { pending_gc_completion = Pending1 },
- lists:reverse(orddict:fetch(File, Pending)))
+ lists:reverse(maps:get(File, Pending)))
end, State, Files).
run_pending_action({read, MsgId, From}, State) ->
@@ -1320,9 +1320,9 @@ adjust_valid_total_size(File, Delta, State = #msstate {
[{#file_summary.valid_total_size, Delta}]),
State #msstate { sum_valid_data = SumValid + Delta }.
-orddict_store(Key, Val, Dict) ->
- false = orddict:is_key(Key, Dict),
- orddict:store(Key, Val, Dict).
+maps_store(Key, Val, Dict) ->
+ false = maps:is_key(Key, Dict),
+ maps:put(Key, Val, Dict).
update_pending_confirms(Fun, CRef,
State = #msstate { clients = Clients,
@@ -1860,7 +1860,7 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
%% complete traversal of FileSummaryEts.
First = ets:first(FileSummaryEts),
case First =:= '$end_of_table' orelse
- orddict:size(Pending) >= ?MAXIMUM_SIMULTANEOUS_GC_FILES of
+ maps:size(Pending) >= ?MAXIMUM_SIMULTANEOUS_GC_FILES of
true ->
State;
false ->
@@ -1869,8 +1869,8 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
not_found ->
State;
{Src, Dst} ->
- Pending1 = orddict_store(Dst, [],
- orddict_store(Src, [], Pending)),
+ Pending1 = maps_store(Dst, [],
+ maps_store(Src, [], Pending)),
State1 = close_handle(Src, close_handle(Dst, State)),
true = ets:update_element(FileSummaryEts, Src,
{#file_summary.locked, true}),
@@ -1926,7 +1926,7 @@ delete_file_if_empty(File, State = #msstate {
0 -> true = ets:update_element(FileSummaryEts, File,
{#file_summary.locked, true}),
ok = rabbit_msg_store_gc:delete(GCPid, File),
- Pending1 = orddict_store(File, [], Pending),
+ Pending1 = maps_store(File, [], Pending),
close_handle(File,
State #msstate { pending_gc_completion = Pending1 });
_ -> State
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 8bf79f7130..565676af3c 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -350,7 +350,7 @@ init([]) ->
subscribers = pmon:new(),
partitions = [],
guid = rabbit_guid:gen(),
- node_guids = orddict:new(),
+ node_guids = maps:new(),
autoheal = rabbit_autoheal:init()})}.
handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
@@ -405,17 +405,17 @@ handle_cast({node_up, Node, NodeType, GUID},
State = #state{guid = MyGUID,
node_guids = GUIDs}) ->
cast(Node, {announce_guid, node(), MyGUID}),
- GUIDs1 = orddict:store(Node, GUID, GUIDs),
+ GUIDs1 = maps:put(Node, GUID, GUIDs),
handle_cast({node_up, Node, NodeType}, State#state{node_guids = GUIDs1});
handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) ->
- {noreply, State#state{node_guids = orddict:store(Node, GUID, GUIDs)}};
+ {noreply, State#state{node_guids = maps:put(Node, GUID, GUIDs)}};
handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID},
State = #state{guid = MyGUID,
node_guids = GUIDs}) ->
case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) andalso
- orddict:find(Node, GUIDs) =:= {ok, NodeGUID} of
+ maps:find(Node, GUIDs) =:= {ok, NodeGUID} of
true -> spawn_link( %%[1]
fun () ->
case rpc:call(Node, rabbit, is_running, []) of
@@ -560,10 +560,10 @@ handle_info({nodedown, Node, Info}, State = #state{guid = MyGUID,
cast(N, {check_partial_partition,
Node, node(), DownGUID, CheckGUID, MyGUID})
end,
- case orddict:find(Node, GUIDs) of
+ case maps:find(Node, GUIDs) of
{ok, DownGUID} -> Alive = rabbit_mnesia:cluster_nodes(running)
-- [node(), Node],
- [case orddict:find(N, GUIDs) of
+ [case maps:find(N, GUIDs) of
{ok, CheckGUID} -> Check(N, CheckGUID, DownGUID);
error -> ok
end || N <- Alive];
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index 41e65e8a1f..9f9ef5adca 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -207,13 +207,13 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow,
?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)).
batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) ->
- PubDict = partition_publish_batch(Publishes, MaxP),
+ PubMap = partition_publish_batch(Publishes, MaxP),
lists:foldl(
fun ({Priority, Pubs}, St) ->
pick1(fun (_P, BQSN) ->
BQ:batch_publish(Pubs, ChPid, Flow, BQSN)
end, Priority, St)
- end, State, orddict:to_list(PubDict));
+ end, State, maps:to_list(PubMap));
batch_publish(Publishes, ChPid, Flow,
State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(batch_publish(Publishes, ChPid, Flow, BQS)).
@@ -229,7 +229,7 @@ publish_delivered(Msg, MsgProps, ChPid, Flow,
?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)).
batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) ->
- PubDict = partition_publish_delivered_batch(Publishes, MaxP),
+ PubMap = partition_publish_delivered_batch(Publishes, MaxP),
{PrioritiesAndAcks, State1} =
lists:foldl(
fun ({Priority, Pubs}, {PriosAndAcks, St}) ->
@@ -241,7 +241,7 @@ batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [
{priority_on_acktags(P, AckTags), BQSN1}
end, Priority, St),
{[PriosAndAcks1 | PriosAndAcks], St1}
- end, {[], State}, orddict:to_list(PubDict)),
+ end, {[], State}, maps:to_list(PubMap)),
{lists:reverse(PrioritiesAndAcks), State1};
batch_publish_delivered(Publishes, ChPid, Flow,
State = #passthrough{bq = BQ, bqs = BQS}) ->
@@ -327,7 +327,7 @@ ackfold(MsgFun, Acc, State = #state{bq = BQ}, AckTags) ->
AckTagsByPriority = partition_acktags(AckTags),
fold2(
fun (P, BQSN, AccN) ->
- case orddict:find(P, AckTagsByPriority) of
+ case maps:find(P, AckTagsByPriority) of
{ok, ATagsN} -> {AccN1, BQSN1} =
BQ:ackfold(MsgFun, AccN, BQSN, ATagsN),
{priority_on_acktags(P, AccN1), BQSN1};
@@ -439,7 +439,7 @@ zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{bqss = [{MaxP, _} |_]}) ->
MsgsByPriority = partition_publish_delivered_batch(Msgs, MaxP),
lists:foldl(fun (Acks, MAs) ->
{P, _AckTag} = hd(Acks),
- Pubs = orddict:fetch(P, MsgsByPriority),
+ Pubs = maps:get(P, MsgsByPriority),
MAs0 = zip_msgs_and_acks(Pubs, Acks),
MAs ++ MAs0
end, Accumulator, AckTags);
@@ -527,7 +527,7 @@ fold_min2(Fun, State) ->
fold_by_acktags2(Fun, AckTags, State) ->
AckTagsByPriority = partition_acktags(AckTags),
fold_append2(fun (P, BQSN) ->
- case orddict:find(P, AckTagsByPriority) of
+ case maps:find(P, AckTagsByPriority) of
{ok, AckTagsN} -> Fun(AckTagsN, BQSN);
error -> {[], BQSN}
end
@@ -597,11 +597,11 @@ partition_publishes(Publishes, ExtractMsg, MaxP) ->
Partitioned =
lists:foldl(fun (Pub, Dict) ->
Msg = ExtractMsg(Pub),
- rabbit_misc:orddict_cons(priority(Msg, MaxP), Pub, Dict)
- end, orddict:new(), Publishes),
- orddict:map(fun (_P, RevPubs) ->
- lists:reverse(RevPubs)
- end, Partitioned).
+ rabbit_misc:maps_cons(priority(Msg, MaxP), Pub, Dict)
+ end, maps:new(), Publishes),
+ maps:map(fun (_P, RevPubs) ->
+ lists:reverse(RevPubs)
+ end, Partitioned).
priority_bq(Priority, [{MaxP, _} | _] = BQSs) ->
@@ -625,14 +625,14 @@ add_maybe_infinity(infinity, _) -> infinity;
add_maybe_infinity(_, infinity) -> infinity;
add_maybe_infinity(A, B) -> A + B.
-partition_acktags(AckTags) -> partition_acktags(AckTags, orddict:new()).
+partition_acktags(AckTags) -> partition_acktags(AckTags, maps:new()).
partition_acktags([], Partitioned) ->
- orddict:map(fun (_P, RevAckTags) ->
- lists:reverse(RevAckTags)
- end, Partitioned);
+ maps:map(fun (_P, RevAckTags) ->
+ lists:reverse(RevAckTags)
+ end, Partitioned);
partition_acktags([{P, AckTag} | Rest], Partitioned) ->
- partition_acktags(Rest, rabbit_misc:orddict_cons(P, AckTag, Partitioned)).
+ partition_acktags(Rest, rabbit_misc:maps_cons(P, AckTag, Partitioned)).
priority_on_acktags(P, AckTags) ->
[case Tag of
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index f13a46fcf3..0fe3065fe8 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -254,9 +254,9 @@ subtract_acks(ChPid, AckTags, State) ->
not_found;
C = #cr{acktags = ChAckTags, limiter = Lim} ->
{CTagCounts, AckTags2} = subtract_acks(
- AckTags, [], orddict:new(), ChAckTags),
+ AckTags, [], maps:new(), ChAckTags),
{Unblocked, Lim2} =
- orddict:fold(
+ maps:fold(
fun (CTag, Count, {UnblockedN, LimN}) ->
{Unblocked1, LimN1} =
rabbit_limiter:ack_from_queue(LimN, CTag, Count),
@@ -278,7 +278,7 @@ subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) ->
case queue:out(AckQ) of
{{value, {T, CTag}}, QTail} ->
subtract_acks(TL, Prefix,
- orddict:update_counter(CTag, 1, CTagCounts), QTail);
+ maps:update_with(CTag, fun (Old) -> Old + 1 end, 1, CTagCounts), QTail);
{{value, V}, QTail} ->
subtract_acks(AckTags, [V | Prefix], CTagCounts, QTail);
{empty, _} ->
diff --git a/src/rabbit_queue_location_validator.erl b/src/rabbit_queue_location_validator.erl
index 2b0f8c7a0f..e70bcd314a 100644
--- a/src/rabbit_queue_location_validator.erl
+++ b/src/rabbit_queue_location_validator.erl
@@ -54,7 +54,6 @@ module(#amqqueue{} = Q) ->
undefined -> no_location_strategy;
Mode -> module(Mode)
end;
-
module(Strategy) when is_binary(Strategy) ->
case rabbit_registry:binary_to_type(Strategy) of
{error, not_found} -> no_location_strategy;
@@ -68,4 +67,6 @@ module(Strategy) when is_binary(Strategy) ->
_ ->
no_location_strategy
end
- end.
+ end;
+module(Strategy) ->
+ module(rabbit_data_coercion:to_binary(Strategy)).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ac3d60bb11..49f4d8d3ed 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -829,7 +829,7 @@ set_ram_duration_target(
(TargetRamCount =/= infinity andalso
TargetRamCount1 >= TargetRamCount) of
true -> State1;
- false -> maybe_reduce_memory_use(State1)
+ false -> reduce_memory_use(State1)
end).
maybe_update_rates(State = #vqstate{ in_counter = InCount,
@@ -911,7 +911,7 @@ timeout(State = #vqstate { index_state = IndexState }) ->
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
-resume(State) -> a(maybe_reduce_memory_use(State)).
+resume(State) -> a(reduce_memory_use(State)).
msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
out = AvgEgressRate } }) ->
@@ -1776,7 +1776,7 @@ remove_queue_entries(Q, DelsAndAcksFun,
State = #vqstate{msg_store_clients = MSCState}) ->
{MsgIdsByStore, Delivers, Acks, State1} =
?QUEUE:foldl(fun remove_queue_entries1/2,
- {orddict:new(), [], [], State}, Q),
+ {maps:new(), [], [], State}, Q),
remove_msgs_by_id(MsgIdsByStore, MSCState),
DelsAndAcksFun(Delivers, Acks, State1).
@@ -1786,7 +1786,7 @@ remove_queue_entries1(
is_persistent = IsPersistent} = MsgStatus,
{MsgIdsByStore, Delivers, Acks, State}) ->
{case MsgInStore of
- true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
+ true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
@@ -2143,27 +2143,27 @@ purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA,
qi_pending_ack = gb_trees:empty()},
{IndexOnDiskSeqIds, MsgIdsByStore, State1}.
-%% MsgIdsByStore is an orddict with two keys:
+%% MsgIdsByStore is an map with two keys:
%%
%% true: holds a list of Persistent Message Ids.
%% false: holds a list of Transient Message Ids.
%%
-%% When we call orddict:to_list/1 we get two sets of msg ids, where
+%% When we call maps:to_list/1 we get two sets of msg ids, where
%% IsPersistent is either true for persistent messages or false for
%% transient ones. The msg_store_remove/3 function takes this boolean
%% flag to determine from which store the messages should be removed
%% from.
remove_msgs_by_id(MsgIdsByStore, MSCState) ->
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
- || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)].
+ || {IsPersistent, MsgIds} <- maps:to_list(MsgIdsByStore)].
remove_transient_msgs_by_id(MsgIdsByStore, MSCState) ->
- case orddict:find(false, MsgIdsByStore) of
+ case maps:find(false, MsgIdsByStore) of
error -> ok;
{ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds)
end.
-accumulate_ack_init() -> {[], orddict:new(), []}.
+accumulate_ack_init() -> {[], maps:new(), []}.
accumulate_ack(#msg_status { seq_id = SeqId,
msg_id = MsgId,
@@ -2173,7 +2173,7 @@ accumulate_ack(#msg_status { seq_id = SeqId,
{IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
{cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
case MsgInStore of
- true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
+ true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
[MsgId | AllMsgIds]}.
@@ -2407,45 +2407,79 @@ reduce_memory_use(State = #vqstate {
out = AvgEgress,
ack_in = AvgAckIngress,
ack_out = AvgAckEgress } }) ->
- State1 = #vqstate { q2 = Q2, q3 = Q3 } =
+ {CreditDiscBound, _} =rabbit_misc:get_env(rabbit,
+ msg_store_credit_disc_bound,
+ ?CREDIT_DISC_BOUND),
+ {NeedResumeA2B, State1} = {_, #vqstate { q2 = Q2, q3 = Q3 }} =
case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
- 0 -> State;
+ 0 -> {false, State};
%% Reduce memory of pending acks and alphas. The order is
%% determined based on which is growing faster. Whichever
%% comes second may very well get a quota of 0 if the
%% first manages to push out the max number of messages.
- S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) >
+ A2BChunk ->
+ %% In case there are few messages to be sent to a message store
+ %% and many messages to be embedded to the queue index,
+ %% we should limit the number of messages to be flushed
+ %% to avoid blocking the process.
+ A2BChunkActual = case A2BChunk > CreditDiscBound * 2 of
+ true -> CreditDiscBound * 2;
+ false -> A2BChunk
+ end,
+ Funs = case ((AvgAckIngress - AvgAckEgress) >
(AvgIngress - AvgEgress)) of
true -> [fun limit_ram_acks/2,
fun push_alphas_to_betas/2];
false -> [fun push_alphas_to_betas/2,
fun limit_ram_acks/2]
end,
- {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
+ {Quota, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
ReduceFun(QuotaN, StateN)
- end, {S1, State}, Funs),
- State2
+ end, {A2BChunkActual, State}, Funs),
+ {(Quota == 0) andalso (A2BChunk > A2BChunkActual), State2}
end,
-
- State3 =
+ Permitted = permitted_beta_count(State1),
+ {NeedResumeB2D, State3} =
%% If there are more messages with their queue position held in RAM,
%% a.k.a. betas, in Q2 & Q3 than IoBatchSize,
%% write their queue position to disk, a.k.a. push_betas_to_deltas
case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
- permitted_beta_count(State1)) of
- S2 when S2 >= IoBatchSize ->
- %% There is an implicit, but subtle, upper bound here. We
- %% may shuffle a lot of messages from Q2/3 into delta, but
- %% the number of these that require any disk operation,
- %% namely index writing, i.e. messages that are genuine
- %% betas and not gammas, is bounded by the credit_flow
- %% limiting of the alpha->beta conversion above.
- push_betas_to_deltas(S2, State1);
+ Permitted) of
+ B2DChunk when B2DChunk >= IoBatchSize ->
+ %% Same as for alphas to betas. Limit a number of messages
+ %% to be flushed to disk at once to avoid blocking the process.
+ B2DChunkActual = case B2DChunk > CreditDiscBound * 2 of
+ true -> CreditDiscBound * 2;
+ false -> B2DChunk
+ end,
+ StateBD = push_betas_to_deltas(B2DChunkActual, State1),
+ {B2DChunk > B2DChunkActual, StateBD};
_ ->
- State1
+ {false, State1}
end,
- %% See rabbitmq-server-290 for the reasons behind this GC call.
- garbage_collect(),
+ %% We can be blocked by the credit flow, or limited by a batch size,
+ %% or finished with flushing.
+ %% If blocked by the credit flow - the credit grant will resume processing,
+ %% if limited by a batch - the batch continuation message should be sent.
+ %% The continuation message will be prioritised over publishes,
+ %% but not cinsumptions, so the queue can make progess.
+ Blocked = credit_flow:blocked(),
+ case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of
+ %% Credit bump will continue paging
+ {true, _} -> ok;
+ %% Finished with paging
+ {false, false} -> ok;
+ %% Planning next batch
+ {false, true} ->
+ %% We don't want to use self-credit-flow, because it's harder to
+ %% reason about. So the process sends a (prioritised) message to
+ %% itself and sets a waiting_bump value to keep the message box clean
+ case get(waiting_bump) of
+ true -> ok;
+ _ -> self() ! bump_reduce_memory_use,
+ put(waiting_bump, true)
+ end
+ end,
State3;
%% When using lazy queues, there are no alphas, so we don't need to
%% call push_alphas_to_betas/2.
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
index 3d69040b04..710e89922e 100644
--- a/src/rabbit_version.erl
+++ b/src/rabbit_version.erl
@@ -197,10 +197,10 @@ categorise_by_scope(Version) when is_list(Version) ->
rabbit_misc:all_module_attributes(rabbit_upgrade),
{Name, Scope, _Requires} <- Attributes,
lists:member(Name, Version)],
- orddict:to_list(
+ maps:to_list(
lists:foldl(fun ({Scope, Name}, CatVersion) ->
- rabbit_misc:orddict_cons(Scope, Name, CatVersion)
- end, orddict:new(), Categorised)).
+ rabbit_misc:maps_cons(Scope, Name, CatVersion)
+ end, maps:new(), Categorised)).
dir() -> rabbit_mnesia:dir().
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 88062dc32a..b2474a6b38 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -51,8 +51,6 @@ memory() ->
0
end,
MgmtDbETS = ets_memory([rabbit_mgmt_storage]),
- VMTotal = vm_memory_monitor:get_process_memory(),
-
[{total, ErlangTotal},
{processes, Processes},
{ets, ETS},
@@ -62,48 +60,59 @@ memory() ->
{system, System}] =
erlang:memory([total, processes, ets, atom, binary, code, system]),
- Unaccounted = case VMTotal - ErlangTotal of
- GTZ when GTZ > 0 -> GTZ;
- _LTZ -> 0
+ Strategy = vm_memory_monitor:get_memory_calculation_strategy(),
+ {Allocated, VMTotal} = case Strategy of
+ erlang -> {ErlangTotal, ErlangTotal};
+ allocated ->
+ Alloc = recon_alloc:memory(allocated),
+ {Alloc, Alloc};
+ rss ->
+ Alloc = recon_alloc:memory(allocated),
+ Vm = vm_memory_monitor:get_process_memory(current),
+ {Alloc, Vm}
end,
+ AllocatedUnused = max(Allocated - ErlangTotal, 0),
+ OSReserved = max(VMTotal - Allocated, 0),
+
OtherProc = Processes
- ConnsReader - ConnsWriter - ConnsChannel - ConnsOther
- Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc,
[
%% Connections
- {connection_readers, ConnsReader},
- {connection_writers, ConnsWriter},
- {connection_channels, ConnsChannel},
- {connection_other, ConnsOther},
+ {connection_readers, ConnsReader},
+ {connection_writers, ConnsWriter},
+ {connection_channels, ConnsChannel},
+ {connection_other, ConnsOther},
%% Queues
- {queue_procs, Qs},
- {queue_slave_procs, QsSlave},
+ {queue_procs, Qs},
+ {queue_slave_procs, QsSlave},
%% Processes
- {plugins, Plugins},
- {other_proc, lists:max([0, OtherProc])}, %% [1]
+ {plugins, Plugins},
+ {other_proc, lists:max([0, OtherProc])}, %% [1]
%% Metrics
- {metrics, MetricsETS + MetricsProc},
- {mgmt_db, MgmtDbETS + MgmtDbProc},
+ {metrics, MetricsETS + MetricsProc},
+ {mgmt_db, MgmtDbETS + MgmtDbProc},
%% ETS
- {mnesia, MnesiaETS},
- {other_ets, ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS},
+ {mnesia, MnesiaETS},
+ {other_ets, ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS},
%% Messages (mostly, some binaries are not messages)
- {binary, Bin},
- {msg_index, MsgIndexETS + MsgIndexProc},
+ {binary, Bin},
+ {msg_index, MsgIndexETS + MsgIndexProc},
%% System
- {code, Code},
- {atom, Atom},
- {other_system, System - ETS - Bin - Code - Atom + Unaccounted},
-
- {total, VMTotal}
+ {code, Code},
+ {atom, Atom},
+ {other_system, System - ETS - Bin - Code - Atom},
+ {allocated_unused, AllocatedUnused},
+ {reserved_unallocated, OSReserved},
+ {total, VMTotal}
].
%% [1] - erlang:memory(processes) can be less than the sum of its
%% parts. Rather than display something nonsensical, just silence any
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index 14654535d6..7cb1214c8e 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -49,10 +49,11 @@ start_link(IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartu
init({IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown,
ConcurrentAcceptorCount, Label}) ->
{ok, AckTimeout} = application:get_env(rabbit, ssl_handshake_timeout),
+ MaxConnections = rabbit_misc:get_env(rabbit, connection_max, infinity),
{ok, {{one_for_all, 10, 10}, [
ranch:child_spec({acceptor, IPAddress, Port}, ConcurrentAcceptorCount,
Transport, [{port, Port}, {ip, IPAddress},
- {max_connections, infinity},
+ {max_connections, MaxConnections},
{ack_timeout, AckTimeout},
{connection_type, supervisor}|SocketOpts],
ProtoSup, ProtoOpts),