diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/gm.erl | 29 | ||||
| -rw-r--r-- | src/rabbit.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 95 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_looking_glass.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_priority_queue.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_queue_location_validator.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 94 | ||||
| -rw-r--r-- | src/rabbit_version.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_vm.erl | 57 | ||||
| -rw-r--r-- | src/tcp_listener_sup.erl | 3 |
16 files changed, 271 insertions, 171 deletions
diff --git a/src/gm.erl b/src/gm.erl index 0da190a57d..c53e9bb007 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -399,7 +399,6 @@ -define(FORCE_GC_TIMER, 250). -define(VERSION_START, 0). -define(SETS, ordsets). --define(DICT, orddict). -record(state, { self, @@ -824,8 +823,8 @@ handle_msg({catchup, Left, MembersStateLeft}, members_state = MembersState }) when MembersState =/= undefined -> MembersStateLeft1 = build_members_state(MembersStateLeft), - AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++ - ?DICT:fetch_keys(MembersStateLeft1)), + AllMembers = lists:usort(maps:keys(MembersState) ++ + maps:keys(MembersStateLeft1)), {MembersState1, Activity} = lists:foldl( fun (Id, MembersStateActivity) -> @@ -995,21 +994,21 @@ is_member_alias(Member, Self, View) -> dead_member_id({dead, Member}) -> Member. store_view_member(VMember = #view_member { id = Id }, {Ver, View}) -> - {Ver, ?DICT:store(Id, VMember, View)}. + {Ver, maps:put(Id, VMember, View)}. with_view_member(Fun, View, Id) -> store_view_member(Fun(fetch_view_member(Id, View)), View). -fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View). +fetch_view_member(Id, {_Ver, View}) -> maps:get(Id, View). -find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View). +find_view_member(Id, {_Ver, View}) -> maps:find(Id, View). -blank_view(Ver) -> {Ver, ?DICT:new()}. +blank_view(Ver) -> {Ver, maps:new()}. -alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View). +alive_view_members({_Ver, View}) -> maps:keys(View). all_known_members({_Ver, View}) -> - ?DICT:fold( + maps:fold( fun (Member, #view_member { aliases = Aliases }, Acc) -> ?SETS:to_list(Aliases) ++ [Member | Acc] end, [], View). @@ -1374,24 +1373,24 @@ with_member_acc(Fun, Id, {MembersState, Acc}) -> {store_member(Id, MemberState, MembersState), Acc1}. find_member_or_blank(Id, MembersState) -> - case ?DICT:find(Id, MembersState) of + case maps:find(Id, MembersState) of {ok, Result} -> Result; error -> blank_member() end. -erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState). +erase_member(Id, MembersState) -> maps:remove(Id, MembersState). blank_member() -> #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }. -blank_member_state() -> ?DICT:new(). +blank_member_state() -> maps:new(). store_member(Id, MemberState, MembersState) -> - ?DICT:store(Id, MemberState, MembersState). + maps:put(Id, MemberState, MembersState). -prepare_members_state(MembersState) -> ?DICT:to_list(MembersState). +prepare_members_state(MembersState) -> maps:to_list(MembersState). -build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList). +build_members_state(MembersStateList) -> maps:from_list(MembersStateList). make_member(GroupName) -> {case dirty_read_group(GroupName) of diff --git a/src/rabbit.erl b/src/rabbit.erl index cc1e0e08c4..0d0ff2f9fc 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -23,6 +23,7 @@ status/0, is_running/0, alarms/0, is_running/1, environment/0, rotate_logs/0, force_event_refresh/1, start_fhc/0]). + -export([start/2, stop/1, prep_stop/1]). -export([start_apps/1, start_apps/2, stop_apps/1]). -export([log_locations/0, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent @@ -230,6 +231,7 @@ %%---------------------------------------------------------------------------- +-type restart_type() :: 'permanent' | 'transient' | 'temporary'. %% this really should be an abstract type -type log_location() :: string(). -type param() :: atom(). @@ -267,7 +269,7 @@ -spec recover() -> 'ok'. -spec start_apps([app_name()]) -> 'ok'. -spec start_apps([app_name()], - #{app_name() => permanent|transient|temporary}) -> 'ok'. + #{app_name() => restart_type()}) -> 'ok'. -spec stop_apps([app_name()]) -> 'ok'. %%---------------------------------------------------------------------------- @@ -506,7 +508,7 @@ stop_and_halt() -> start_apps(Apps) -> start_apps(Apps, #{}). -start_apps(Apps, AppModes) -> +start_apps(Apps, RestartTypes) -> app_utils:load_applications(Apps), ConfigEntryDecoder = case application:get_env(rabbit, config_entry_decoder) of @@ -547,7 +549,7 @@ start_apps(Apps, AppModes) -> end, ok = app_utils:start_applications(OrderedApps, handle_app_error(could_not_start), - AppModes). + RestartTypes). %% This function retrieves the correct IoDevice for requesting %% input. The problem with using the default IoDevice is that diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ed88e69378..b51375f9fb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -22,6 +22,7 @@ -define(SYNC_INTERVAL, 200). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(CONSUMER_BIAS_RATIO, 2.0). %% i.e. consume 100% faster -export([info_keys/0]). @@ -1072,27 +1073,27 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName, ActingUser) -> %%---------------------------------------------------------------------------- -prioritise_call(Msg, _From, _Len, _State) -> +prioritise_call(Msg, _From, _Len, State) -> case Msg of info -> 9; {info, _Items} -> 9; consumers -> 9; stat -> 7; - {basic_consume, _, _, _, _, _, _, _, _, _, _} -> 1; - {basic_cancel, _, _, _} -> 1; + {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State, 0, 2); + {basic_cancel, _, _, _} -> consumer_bias(State, 0, 2); _ -> 0 end. -prioritise_cast(Msg, _Len, _State) -> +prioritise_cast(Msg, _Len, State) -> case Msg of delete_immediately -> 8; {delete_exclusive, _Pid} -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; {run_backing_queue, _Mod, _Fun} -> 6; - {ack, _AckTags, _ChPid} -> 3; %% [1] - {resume, _ChPid} -> 2; - {notify_sent, _ChPid, _Credit} -> 1; + {ack, _AckTags, _ChPid} -> 4; %% [1] + {resume, _ChPid} -> 3; + {notify_sent, _ChPid, _Credit} -> consumer_bias(State, 0, 2); _ -> 0 end. @@ -1104,6 +1105,16 @@ prioritise_cast(Msg, _Len, _State) -> %% stack are optimised for that) and to make things easier to reason %% about. Finally, we prioritise ack over resume since it should %% always reduce memory use. +%% bump_reduce_memory_use is prioritised over publishes, because sending +%% credit to self is hard to reason about. Consumers can continue while +%% reduce_memory_use is in progress. + +consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}, Low, High) -> + case BQ:msg_rates(BQS) of + {0.0, _} -> Low; + {Ingress, Egress} when Egress / Ingress < ?CONSUMER_BIAS_RATIO -> High; + {_, _} -> Low + end. prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> case Msg of @@ -1113,6 +1124,7 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> {drop_expired, _Version} -> 8; emit_stats -> 7; sync_timeout -> 6; + bump_reduce_memory_use -> 1; _ -> 0 end. @@ -1503,6 +1515,10 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ, %% rabbit_variable_queue:msg_store_write/4. credit_flow:handle_bump_msg(Msg), noreply(State#q{backing_queue_state = BQ:resume(BQS)}); +handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + put(waiting_bump, false), + noreply(State#q{backing_queue_state = BQ:resume(BQS)}); handle_info(Info, State) -> {stop, {unhandled_info, Info}, State}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 1167ac66f9..c671438ce8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -186,10 +186,21 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). --define(INCR_STATS(Incs, Measure, State), +-define(INCR_STATS(Type, Key, Inc, Measure, State), case rabbit_event:stats_level(State, #ch.stats_timer) of - fine -> incr_stats(Incs, Measure); - _ -> ok + fine -> + rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc), + %% Keys in the process dictionary are used to clean up the core metrics + put({Type, Key}, none); + _ -> + ok + end). + +-define(INCR_STATS(Type, Key, Inc, Measure), + begin + rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc), + %% Keys in the process dictionary are used to clean up the core metrics + put({Type, Key}, none) end). %%---------------------------------------------------------------------------- @@ -378,7 +389,16 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, true -> flow; false -> noflow end, - + {ok, {Global, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch), + Limiter0 = rabbit_limiter:new(LimiterPid), + Limiter = case {Global, Prefetch} of + {true, 0} -> + rabbit_limiter:unlimit_prefetch(Limiter0); + {true, _} -> + rabbit_limiter:limit_prefetch(Limiter0, Prefetch, 0); + _ -> + Limiter0 + end, State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -386,7 +406,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, writer_pid = WriterPid, conn_pid = ConnPid, conn_name = ConnName, - limiter = rabbit_limiter:new(LimiterPid), + limiter = Limiter, tx = none, next_tag = 1, unacked_message_q = queue:new(), @@ -407,7 +427,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, mandatory = dtree:empty(), capabilities = Capabilities, trace_state = rabbit_trace:init(VHost), - consumer_prefetch = 0, + consumer_prefetch = Prefetch, reply_consumer = none, delivery_flow = Flow, interceptor_state = undefined}, @@ -1258,8 +1278,12 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{global = false, - prefetch_count = PrefetchCount}, _, State) -> - {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount}}; + prefetch_count = PrefetchCount}, + _, State = #ch{limiter = Limiter}) -> + %% Ensures that if default was set, it's overriden + Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), + {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount, + limiter = Limiter1}}; handle_method(#'basic.qos'{global = true, prefetch_count = 0}, @@ -1632,7 +1656,7 @@ basic_return(#basic_message{exchange_name = ExchangeName, content = Content}, State = #ch{protocol = Protocol, writer_pid = WriterPid}, Reason) -> - ?INCR_STATS([{exchange_stats, ExchangeName, 1}], return_unroutable, State), + ?INCR_STATS(exchange_stats, ExchangeName, 1, return_unroutable, State), {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), ok = rabbit_writer:send_command( WriterPid, @@ -1669,14 +1693,14 @@ record_sent(ConsumerTag, AckRequired, user = #user{username = Username}, conn_name = ConnName, channel = ChannelNum}) -> - ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of - {none, true} -> get; - {none, false} -> get_no_ack; - {_ , true} -> deliver; - {_ , false} -> deliver_no_ack - end, State), + ?INCR_STATS(queue_stats, QName, 1, case {ConsumerTag, AckRequired} of + {none, true} -> get; + {none, false} -> get_no_ack; + {_ , true} -> deliver; + {_ , false} -> deliver_no_ack + end, State), case Redelivered of - true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State); + true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State); false -> ok end, rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState), @@ -1721,11 +1745,11 @@ ack(Acked, State = #ch{queue_names = QNames}) -> foreach_per_queue( fun (QPid, MsgIds) -> ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), - ?INCR_STATS(case maps:find(QPid, QNames) of - {ok, QName} -> Count = length(MsgIds), - [{queue_stats, QName, Count}]; - error -> [] - end, ack, State) + case maps:find(QPid, QNames) of + {ok, QName} -> Count = length(MsgIds), + ?INCR_STATS(queue_stats, QName, Count, ack, State); + error -> ok + end end, Acked), ok = notify_limiter(State#ch.limiter, Acked). @@ -1790,7 +1814,7 @@ deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, confirm = false, mandatory = false}, []}, State) -> %% optimisation - ?INCR_STATS([{exchange_stats, XName, 1}], publish, State), + ?INCR_STATS(exchange_stats, XName, 1, publish, State), State; deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, @@ -1827,11 +1851,15 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ Message, State1), State3 = process_routing_confirm( Confirm, DeliveredQPids, MsgSeqNo, XName, State2), - ?INCR_STATS([{exchange_stats, XName, 1} | - [{queue_exchange_stats, {QName, XName}, 1} || - QPid <- DeliveredQPids, - {ok, QName} <- [maps:find(QPid, QNames1)]]], - publish, State3), + case rabbit_event:stats_level(State3, #ch.stats_timer) of + fine -> + ?INCR_STATS(exchange_stats, XName, 1, publish), + [?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) || + QPid <- DeliveredQPids, + {ok, QName} <- [maps:find(QPid, QNames1)]]; + _ -> + ok + end, State3. process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) -> @@ -1874,7 +1902,7 @@ send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) -> ok -> ConfirmMsgSeqNos = lists:foldl( fun ({MsgSeqNo, XName}, MSNs) -> - ?INCR_STATS([{exchange_stats, XName, 1}], + ?INCR_STATS(exchange_stats, XName, 1, confirm, State), [MsgSeqNo | MSNs] end, [], lists:append(C)), @@ -1977,17 +2005,14 @@ i(Item, _) -> name(#ch{conn_name = ConnName, channel = Channel}) -> list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])). -incr_stats(Incs, Measure) -> - [begin - rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc), - %% Keys in the process dictionary are used to clean up the core metrics - put({Type, Key}, none) - end || {Type, Key, Inc} <- Incs]. - emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> [{reductions, Red} | Coarse0] = infos(?STATISTICS_KEYS, State), + %% First metric must be `idle_since` (if available), as expected by + %% `rabbit_mgmt_format:format_channel_stats`. This is a performance + %% optimisation that avoids traversing the whole list when only + %% one element has to be formatted. rabbit_core_metrics:channel_stats(self(), Extra ++ Coarse0), rabbit_core_metrics:channel_stats(reductions, self(), Red). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index e81b10a555..93bacc568d 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -184,7 +184,7 @@ %% 'Notify' is a boolean that indicates whether a queue should be %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. - queues = orddict:new(), % QPid -> {MonitorRef, Notify} + queues = maps:new(), % QPid -> {MonitorRef, Notify} volume = 0}). %% mode is of type credit_mode() @@ -402,28 +402,28 @@ prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. remember_queue(QPid, State = #lim{queues = Queues}) -> - case orddict:is_key(QPid, Queues) of + case maps:is_key(QPid, Queues) of false -> MRef = erlang:monitor(process, QPid), - State#lim{queues = orddict:store(QPid, {MRef, false}, Queues)}; + State#lim{queues = maps:put(QPid, {MRef, false}, Queues)}; true -> State end. forget_queue(QPid, State = #lim{queues = Queues}) -> - case orddict:find(QPid, Queues) of + case maps:find(QPid, Queues) of {ok, {MRef, _}} -> true = erlang:demonitor(MRef), - State#lim{queues = orddict:erase(QPid, Queues)}; + State#lim{queues = maps:remove(QPid, Queues)}; error -> State end. limit_queue(QPid, State = #lim{queues = Queues}) -> UpdateFun = fun ({MRef, _}) -> {MRef, true} end, - State#lim{queues = orddict:update(QPid, UpdateFun, Queues)}. + State#lim{queues = maps:update_with(QPid, UpdateFun, Queues)}. notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> {QList, NewQueues} = - orddict:fold(fun (_QPid, {_, false}, Acc) -> Acc; + maps:fold(fun (_QPid, {_, false}, Acc) -> Acc; (QPid, {MRef, true}, {L, D}) -> - {[QPid | L], orddict:store(QPid, {MRef, false}, D)} + {[QPid | L], maps:put(QPid, {MRef, false}, D)} end, {[], Queues}, Queues), case length(QList) of 0 -> ok; diff --git a/src/rabbit_looking_glass.erl b/src/rabbit_looking_glass.erl index 702fb41eb8..6cd53dbc23 100644 --- a/src/rabbit_looking_glass.erl +++ b/src/rabbit_looking_glass.erl @@ -17,6 +17,7 @@ -module(rabbit_looking_glass). -ignore_xref([{lg, trace, 4}]). +-ignore_xref([{maps, from_list, 1}]). -export([boot/0]). -export([connections/0]). @@ -27,12 +28,21 @@ boot() -> ok; Value -> Input = parse_value(Value), - rabbit_log:info("Enabling Looking Glass profiler, input value: ~p", [Input]), + rabbit_log:info( + "Enabling Looking Glass profiler, input value: ~p", + [Input] + ), {ok, _} = application:ensure_all_started(looking_glass), - lg:trace(Input, lg_file_tracer, "traces.lz4", #{ - mode => profile, - running => true, - send => true}) + lg:trace( + Input, + lg_file_tracer, + "traces.lz4", + maps:from_list([ + {mode, profile}, + {running, true}, + {send, true}] + ) + ) end. parse_value(Value) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 6139099ed1..65a13f03c0 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -374,6 +374,9 @@ handle_info({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), noreply(State); +handle_info(bump_reduce_memory_use, State) -> + noreply(State); + %% In the event of a short partition during sync we can detect the %% master's 'death', drop out of sync, and then receive sync messages %% which were still in flight. Ignore them. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index fe78075d0f..76976ad771 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -780,7 +780,7 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) -> sync_timer_ref = undefined, sum_valid_data = 0, sum_file_size = 0, - pending_gc_completion = orddict:new(), + pending_gc_completion = maps:new(), gc_pid = GCPid, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, @@ -1269,7 +1269,7 @@ contains_message(MsgId, From, gen_server2:reply(From, false), State; #msg_location { file = File } -> - case orddict:is_key(File, Pending) of + case maps:is_key(File, Pending) of true -> add_to_pending_gc_completion( {contains, MsgId, From}, File, State); false -> gen_server2:reply(From, true), @@ -1280,16 +1280,16 @@ contains_message(MsgId, From, add_to_pending_gc_completion( Op, File, State = #msstate { pending_gc_completion = Pending }) -> State #msstate { pending_gc_completion = - rabbit_misc:orddict_cons(File, Op, Pending) }. + rabbit_misc:maps_cons(File, Op, Pending) }. run_pending(Files, State) -> lists:foldl( fun (File, State1 = #msstate { pending_gc_completion = Pending }) -> - Pending1 = orddict:erase(File, Pending), + Pending1 = maps:remove(File, Pending), lists:foldl( fun run_pending_action/2, State1 #msstate { pending_gc_completion = Pending1 }, - lists:reverse(orddict:fetch(File, Pending))) + lists:reverse(maps:get(File, Pending))) end, State, Files). run_pending_action({read, MsgId, From}, State) -> @@ -1320,9 +1320,9 @@ adjust_valid_total_size(File, Delta, State = #msstate { [{#file_summary.valid_total_size, Delta}]), State #msstate { sum_valid_data = SumValid + Delta }. -orddict_store(Key, Val, Dict) -> - false = orddict:is_key(Key, Dict), - orddict:store(Key, Val, Dict). +maps_store(Key, Val, Dict) -> + false = maps:is_key(Key, Dict), + maps:put(Key, Val, Dict). update_pending_confirms(Fun, CRef, State = #msstate { clients = Clients, @@ -1860,7 +1860,7 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, %% complete traversal of FileSummaryEts. First = ets:first(FileSummaryEts), case First =:= '$end_of_table' orelse - orddict:size(Pending) >= ?MAXIMUM_SIMULTANEOUS_GC_FILES of + maps:size(Pending) >= ?MAXIMUM_SIMULTANEOUS_GC_FILES of true -> State; false -> @@ -1869,8 +1869,8 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, not_found -> State; {Src, Dst} -> - Pending1 = orddict_store(Dst, [], - orddict_store(Src, [], Pending)), + Pending1 = maps_store(Dst, [], + maps_store(Src, [], Pending)), State1 = close_handle(Src, close_handle(Dst, State)), true = ets:update_element(FileSummaryEts, Src, {#file_summary.locked, true}), @@ -1926,7 +1926,7 @@ delete_file_if_empty(File, State = #msstate { 0 -> true = ets:update_element(FileSummaryEts, File, {#file_summary.locked, true}), ok = rabbit_msg_store_gc:delete(GCPid, File), - Pending1 = orddict_store(File, [], Pending), + Pending1 = maps_store(File, [], Pending), close_handle(File, State #msstate { pending_gc_completion = Pending1 }); _ -> State diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 8bf79f7130..565676af3c 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -350,7 +350,7 @@ init([]) -> subscribers = pmon:new(), partitions = [], guid = rabbit_guid:gen(), - node_guids = orddict:new(), + node_guids = maps:new(), autoheal = rabbit_autoheal:init()})}. handle_call(partitions, _From, State = #state{partitions = Partitions}) -> @@ -405,17 +405,17 @@ handle_cast({node_up, Node, NodeType, GUID}, State = #state{guid = MyGUID, node_guids = GUIDs}) -> cast(Node, {announce_guid, node(), MyGUID}), - GUIDs1 = orddict:store(Node, GUID, GUIDs), + GUIDs1 = maps:put(Node, GUID, GUIDs), handle_cast({node_up, Node, NodeType}, State#state{node_guids = GUIDs1}); handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) -> - {noreply, State#state{node_guids = orddict:store(Node, GUID, GUIDs)}}; + {noreply, State#state{node_guids = maps:put(Node, GUID, GUIDs)}}; handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID}, State = #state{guid = MyGUID, node_guids = GUIDs}) -> case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) andalso - orddict:find(Node, GUIDs) =:= {ok, NodeGUID} of + maps:find(Node, GUIDs) =:= {ok, NodeGUID} of true -> spawn_link( %%[1] fun () -> case rpc:call(Node, rabbit, is_running, []) of @@ -560,10 +560,10 @@ handle_info({nodedown, Node, Info}, State = #state{guid = MyGUID, cast(N, {check_partial_partition, Node, node(), DownGUID, CheckGUID, MyGUID}) end, - case orddict:find(Node, GUIDs) of + case maps:find(Node, GUIDs) of {ok, DownGUID} -> Alive = rabbit_mnesia:cluster_nodes(running) -- [node(), Node], - [case orddict:find(N, GUIDs) of + [case maps:find(N, GUIDs) of {ok, CheckGUID} -> Check(N, CheckGUID, DownGUID); error -> ok end || N <- Alive]; diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index 41e65e8a1f..9f9ef5adca 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -207,13 +207,13 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow, ?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)). batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) -> - PubDict = partition_publish_batch(Publishes, MaxP), + PubMap = partition_publish_batch(Publishes, MaxP), lists:foldl( fun ({Priority, Pubs}, St) -> pick1(fun (_P, BQSN) -> BQ:batch_publish(Pubs, ChPid, Flow, BQSN) end, Priority, St) - end, State, orddict:to_list(PubDict)); + end, State, maps:to_list(PubMap)); batch_publish(Publishes, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough1(batch_publish(Publishes, ChPid, Flow, BQS)). @@ -229,7 +229,7 @@ publish_delivered(Msg, MsgProps, ChPid, Flow, ?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)). batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) -> - PubDict = partition_publish_delivered_batch(Publishes, MaxP), + PubMap = partition_publish_delivered_batch(Publishes, MaxP), {PrioritiesAndAcks, State1} = lists:foldl( fun ({Priority, Pubs}, {PriosAndAcks, St}) -> @@ -241,7 +241,7 @@ batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [ {priority_on_acktags(P, AckTags), BQSN1} end, Priority, St), {[PriosAndAcks1 | PriosAndAcks], St1} - end, {[], State}, orddict:to_list(PubDict)), + end, {[], State}, maps:to_list(PubMap)), {lists:reverse(PrioritiesAndAcks), State1}; batch_publish_delivered(Publishes, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> @@ -327,7 +327,7 @@ ackfold(MsgFun, Acc, State = #state{bq = BQ}, AckTags) -> AckTagsByPriority = partition_acktags(AckTags), fold2( fun (P, BQSN, AccN) -> - case orddict:find(P, AckTagsByPriority) of + case maps:find(P, AckTagsByPriority) of {ok, ATagsN} -> {AccN1, BQSN1} = BQ:ackfold(MsgFun, AccN, BQSN, ATagsN), {priority_on_acktags(P, AccN1), BQSN1}; @@ -439,7 +439,7 @@ zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{bqss = [{MaxP, _} |_]}) -> MsgsByPriority = partition_publish_delivered_batch(Msgs, MaxP), lists:foldl(fun (Acks, MAs) -> {P, _AckTag} = hd(Acks), - Pubs = orddict:fetch(P, MsgsByPriority), + Pubs = maps:get(P, MsgsByPriority), MAs0 = zip_msgs_and_acks(Pubs, Acks), MAs ++ MAs0 end, Accumulator, AckTags); @@ -527,7 +527,7 @@ fold_min2(Fun, State) -> fold_by_acktags2(Fun, AckTags, State) -> AckTagsByPriority = partition_acktags(AckTags), fold_append2(fun (P, BQSN) -> - case orddict:find(P, AckTagsByPriority) of + case maps:find(P, AckTagsByPriority) of {ok, AckTagsN} -> Fun(AckTagsN, BQSN); error -> {[], BQSN} end @@ -597,11 +597,11 @@ partition_publishes(Publishes, ExtractMsg, MaxP) -> Partitioned = lists:foldl(fun (Pub, Dict) -> Msg = ExtractMsg(Pub), - rabbit_misc:orddict_cons(priority(Msg, MaxP), Pub, Dict) - end, orddict:new(), Publishes), - orddict:map(fun (_P, RevPubs) -> - lists:reverse(RevPubs) - end, Partitioned). + rabbit_misc:maps_cons(priority(Msg, MaxP), Pub, Dict) + end, maps:new(), Publishes), + maps:map(fun (_P, RevPubs) -> + lists:reverse(RevPubs) + end, Partitioned). priority_bq(Priority, [{MaxP, _} | _] = BQSs) -> @@ -625,14 +625,14 @@ add_maybe_infinity(infinity, _) -> infinity; add_maybe_infinity(_, infinity) -> infinity; add_maybe_infinity(A, B) -> A + B. -partition_acktags(AckTags) -> partition_acktags(AckTags, orddict:new()). +partition_acktags(AckTags) -> partition_acktags(AckTags, maps:new()). partition_acktags([], Partitioned) -> - orddict:map(fun (_P, RevAckTags) -> - lists:reverse(RevAckTags) - end, Partitioned); + maps:map(fun (_P, RevAckTags) -> + lists:reverse(RevAckTags) + end, Partitioned); partition_acktags([{P, AckTag} | Rest], Partitioned) -> - partition_acktags(Rest, rabbit_misc:orddict_cons(P, AckTag, Partitioned)). + partition_acktags(Rest, rabbit_misc:maps_cons(P, AckTag, Partitioned)). priority_on_acktags(P, AckTags) -> [case Tag of diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index f13a46fcf3..0fe3065fe8 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -254,9 +254,9 @@ subtract_acks(ChPid, AckTags, State) -> not_found; C = #cr{acktags = ChAckTags, limiter = Lim} -> {CTagCounts, AckTags2} = subtract_acks( - AckTags, [], orddict:new(), ChAckTags), + AckTags, [], maps:new(), ChAckTags), {Unblocked, Lim2} = - orddict:fold( + maps:fold( fun (CTag, Count, {UnblockedN, LimN}) -> {Unblocked1, LimN1} = rabbit_limiter:ack_from_queue(LimN, CTag, Count), @@ -278,7 +278,7 @@ subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) -> case queue:out(AckQ) of {{value, {T, CTag}}, QTail} -> subtract_acks(TL, Prefix, - orddict:update_counter(CTag, 1, CTagCounts), QTail); + maps:update_with(CTag, fun (Old) -> Old + 1 end, 1, CTagCounts), QTail); {{value, V}, QTail} -> subtract_acks(AckTags, [V | Prefix], CTagCounts, QTail); {empty, _} -> diff --git a/src/rabbit_queue_location_validator.erl b/src/rabbit_queue_location_validator.erl index 2b0f8c7a0f..e70bcd314a 100644 --- a/src/rabbit_queue_location_validator.erl +++ b/src/rabbit_queue_location_validator.erl @@ -54,7 +54,6 @@ module(#amqqueue{} = Q) -> undefined -> no_location_strategy; Mode -> module(Mode) end; - module(Strategy) when is_binary(Strategy) -> case rabbit_registry:binary_to_type(Strategy) of {error, not_found} -> no_location_strategy; @@ -68,4 +67,6 @@ module(Strategy) when is_binary(Strategy) -> _ -> no_location_strategy end - end. + end; +module(Strategy) -> + module(rabbit_data_coercion:to_binary(Strategy)). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ac3d60bb11..49f4d8d3ed 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -829,7 +829,7 @@ set_ram_duration_target( (TargetRamCount =/= infinity andalso TargetRamCount1 >= TargetRamCount) of true -> State1; - false -> maybe_reduce_memory_use(State1) + false -> reduce_memory_use(State1) end). maybe_update_rates(State = #vqstate{ in_counter = InCount, @@ -911,7 +911,7 @@ timeout(State = #vqstate { index_state = IndexState }) -> handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. -resume(State) -> a(maybe_reduce_memory_use(State)). +resume(State) -> a(reduce_memory_use(State)). msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, out = AvgEgressRate } }) -> @@ -1776,7 +1776,7 @@ remove_queue_entries(Q, DelsAndAcksFun, State = #vqstate{msg_store_clients = MSCState}) -> {MsgIdsByStore, Delivers, Acks, State1} = ?QUEUE:foldl(fun remove_queue_entries1/2, - {orddict:new(), [], [], State}, Q), + {maps:new(), [], [], State}, Q), remove_msgs_by_id(MsgIdsByStore, MSCState), DelsAndAcksFun(Delivers, Acks, State1). @@ -1786,7 +1786,7 @@ remove_queue_entries1( is_persistent = IsPersistent} = MsgStatus, {MsgIdsByStore, Delivers, Acks, State}) -> {case MsgInStore of - true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); + true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), @@ -2143,27 +2143,27 @@ purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA, qi_pending_ack = gb_trees:empty()}, {IndexOnDiskSeqIds, MsgIdsByStore, State1}. -%% MsgIdsByStore is an orddict with two keys: +%% MsgIdsByStore is an map with two keys: %% %% true: holds a list of Persistent Message Ids. %% false: holds a list of Transient Message Ids. %% -%% When we call orddict:to_list/1 we get two sets of msg ids, where +%% When we call maps:to_list/1 we get two sets of msg ids, where %% IsPersistent is either true for persistent messages or false for %% transient ones. The msg_store_remove/3 function takes this boolean %% flag to determine from which store the messages should be removed %% from. remove_msgs_by_id(MsgIdsByStore, MSCState) -> [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) - || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)]. + || {IsPersistent, MsgIds} <- maps:to_list(MsgIdsByStore)]. remove_transient_msgs_by_id(MsgIdsByStore, MSCState) -> - case orddict:find(false, MsgIdsByStore) of + case maps:find(false, MsgIdsByStore) of error -> ok; {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds) end. -accumulate_ack_init() -> {[], orddict:new(), []}. +accumulate_ack_init() -> {[], maps:new(), []}. accumulate_ack(#msg_status { seq_id = SeqId, msg_id = MsgId, @@ -2173,7 +2173,7 @@ accumulate_ack(#msg_status { seq_id = SeqId, {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc), case MsgInStore of - true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); + true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, [MsgId | AllMsgIds]}. @@ -2407,45 +2407,79 @@ reduce_memory_use(State = #vqstate { out = AvgEgress, ack_in = AvgAckIngress, ack_out = AvgAckEgress } }) -> - State1 = #vqstate { q2 = Q2, q3 = Q3 } = + {CreditDiscBound, _} =rabbit_misc:get_env(rabbit, + msg_store_credit_disc_bound, + ?CREDIT_DISC_BOUND), + {NeedResumeA2B, State1} = {_, #vqstate { q2 = Q2, q3 = Q3 }} = case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of - 0 -> State; + 0 -> {false, State}; %% Reduce memory of pending acks and alphas. The order is %% determined based on which is growing faster. Whichever %% comes second may very well get a quota of 0 if the %% first manages to push out the max number of messages. - S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) > + A2BChunk -> + %% In case there are few messages to be sent to a message store + %% and many messages to be embedded to the queue index, + %% we should limit the number of messages to be flushed + %% to avoid blocking the process. + A2BChunkActual = case A2BChunk > CreditDiscBound * 2 of + true -> CreditDiscBound * 2; + false -> A2BChunk + end, + Funs = case ((AvgAckIngress - AvgAckEgress) > (AvgIngress - AvgEgress)) of true -> [fun limit_ram_acks/2, fun push_alphas_to_betas/2]; false -> [fun push_alphas_to_betas/2, fun limit_ram_acks/2] end, - {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> + {Quota, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> ReduceFun(QuotaN, StateN) - end, {S1, State}, Funs), - State2 + end, {A2BChunkActual, State}, Funs), + {(Quota == 0) andalso (A2BChunk > A2BChunkActual), State2} end, - - State3 = + Permitted = permitted_beta_count(State1), + {NeedResumeB2D, State3} = %% If there are more messages with their queue position held in RAM, %% a.k.a. betas, in Q2 & Q3 than IoBatchSize, %% write their queue position to disk, a.k.a. push_betas_to_deltas case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), - permitted_beta_count(State1)) of - S2 when S2 >= IoBatchSize -> - %% There is an implicit, but subtle, upper bound here. We - %% may shuffle a lot of messages from Q2/3 into delta, but - %% the number of these that require any disk operation, - %% namely index writing, i.e. messages that are genuine - %% betas and not gammas, is bounded by the credit_flow - %% limiting of the alpha->beta conversion above. - push_betas_to_deltas(S2, State1); + Permitted) of + B2DChunk when B2DChunk >= IoBatchSize -> + %% Same as for alphas to betas. Limit a number of messages + %% to be flushed to disk at once to avoid blocking the process. + B2DChunkActual = case B2DChunk > CreditDiscBound * 2 of + true -> CreditDiscBound * 2; + false -> B2DChunk + end, + StateBD = push_betas_to_deltas(B2DChunkActual, State1), + {B2DChunk > B2DChunkActual, StateBD}; _ -> - State1 + {false, State1} end, - %% See rabbitmq-server-290 for the reasons behind this GC call. - garbage_collect(), + %% We can be blocked by the credit flow, or limited by a batch size, + %% or finished with flushing. + %% If blocked by the credit flow - the credit grant will resume processing, + %% if limited by a batch - the batch continuation message should be sent. + %% The continuation message will be prioritised over publishes, + %% but not cinsumptions, so the queue can make progess. + Blocked = credit_flow:blocked(), + case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of + %% Credit bump will continue paging + {true, _} -> ok; + %% Finished with paging + {false, false} -> ok; + %% Planning next batch + {false, true} -> + %% We don't want to use self-credit-flow, because it's harder to + %% reason about. So the process sends a (prioritised) message to + %% itself and sets a waiting_bump value to keep the message box clean + case get(waiting_bump) of + true -> ok; + _ -> self() ! bump_reduce_memory_use, + put(waiting_bump, true) + end + end, State3; %% When using lazy queues, there are no alphas, so we don't need to %% call push_alphas_to_betas/2. diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index 3d69040b04..710e89922e 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -197,10 +197,10 @@ categorise_by_scope(Version) when is_list(Version) -> rabbit_misc:all_module_attributes(rabbit_upgrade), {Name, Scope, _Requires} <- Attributes, lists:member(Name, Version)], - orddict:to_list( + maps:to_list( lists:foldl(fun ({Scope, Name}, CatVersion) -> - rabbit_misc:orddict_cons(Scope, Name, CatVersion) - end, orddict:new(), Categorised)). + rabbit_misc:maps_cons(Scope, Name, CatVersion) + end, maps:new(), Categorised)). dir() -> rabbit_mnesia:dir(). diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 88062dc32a..b2474a6b38 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -51,8 +51,6 @@ memory() -> 0 end, MgmtDbETS = ets_memory([rabbit_mgmt_storage]), - VMTotal = vm_memory_monitor:get_process_memory(), - [{total, ErlangTotal}, {processes, Processes}, {ets, ETS}, @@ -62,48 +60,59 @@ memory() -> {system, System}] = erlang:memory([total, processes, ets, atom, binary, code, system]), - Unaccounted = case VMTotal - ErlangTotal of - GTZ when GTZ > 0 -> GTZ; - _LTZ -> 0 + Strategy = vm_memory_monitor:get_memory_calculation_strategy(), + {Allocated, VMTotal} = case Strategy of + erlang -> {ErlangTotal, ErlangTotal}; + allocated -> + Alloc = recon_alloc:memory(allocated), + {Alloc, Alloc}; + rss -> + Alloc = recon_alloc:memory(allocated), + Vm = vm_memory_monitor:get_process_memory(current), + {Alloc, Vm} end, + AllocatedUnused = max(Allocated - ErlangTotal, 0), + OSReserved = max(VMTotal - Allocated, 0), + OtherProc = Processes - ConnsReader - ConnsWriter - ConnsChannel - ConnsOther - Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc, [ %% Connections - {connection_readers, ConnsReader}, - {connection_writers, ConnsWriter}, - {connection_channels, ConnsChannel}, - {connection_other, ConnsOther}, + {connection_readers, ConnsReader}, + {connection_writers, ConnsWriter}, + {connection_channels, ConnsChannel}, + {connection_other, ConnsOther}, %% Queues - {queue_procs, Qs}, - {queue_slave_procs, QsSlave}, + {queue_procs, Qs}, + {queue_slave_procs, QsSlave}, %% Processes - {plugins, Plugins}, - {other_proc, lists:max([0, OtherProc])}, %% [1] + {plugins, Plugins}, + {other_proc, lists:max([0, OtherProc])}, %% [1] %% Metrics - {metrics, MetricsETS + MetricsProc}, - {mgmt_db, MgmtDbETS + MgmtDbProc}, + {metrics, MetricsETS + MetricsProc}, + {mgmt_db, MgmtDbETS + MgmtDbProc}, %% ETS - {mnesia, MnesiaETS}, - {other_ets, ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS}, + {mnesia, MnesiaETS}, + {other_ets, ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS}, %% Messages (mostly, some binaries are not messages) - {binary, Bin}, - {msg_index, MsgIndexETS + MsgIndexProc}, + {binary, Bin}, + {msg_index, MsgIndexETS + MsgIndexProc}, %% System - {code, Code}, - {atom, Atom}, - {other_system, System - ETS - Bin - Code - Atom + Unaccounted}, - - {total, VMTotal} + {code, Code}, + {atom, Atom}, + {other_system, System - ETS - Bin - Code - Atom}, + {allocated_unused, AllocatedUnused}, + {reserved_unallocated, OSReserved}, + {total, VMTotal} ]. %% [1] - erlang:memory(processes) can be less than the sum of its %% parts. Rather than display something nonsensical, just silence any diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 14654535d6..7cb1214c8e 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -49,10 +49,11 @@ start_link(IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartu init({IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown, ConcurrentAcceptorCount, Label}) -> {ok, AckTimeout} = application:get_env(rabbit, ssl_handshake_timeout), + MaxConnections = rabbit_misc:get_env(rabbit, connection_max, infinity), {ok, {{one_for_all, 10, 10}, [ ranch:child_spec({acceptor, IPAddress, Port}, ConcurrentAcceptorCount, Transport, [{port, Port}, {ip, IPAddress}, - {max_connections, infinity}, + {max_connections, MaxConnections}, {ack_timeout, AckTimeout}, {connection_type, supervisor}|SocketOpts], ProtoSup, ProtoOpts), |
