diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2011-10-06 16:12:13 +0100 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2011-10-06 16:12:13 +0100 |
| commit | f88351e1078750fcfc5f8b8a66bda5a3b1aece12 (patch) | |
| tree | 04aff3de5e70405f7cafdcc283e44a8e80d371b3 /src | |
| parent | b1ef2b126a7e59775c28702e3df18be926b33dbe (diff) | |
| parent | cb360e2c53d982631201e5c3fc33589528853cfd (diff) | |
| download | rabbitmq-server-git-f88351e1078750fcfc5f8b8a66bda5a3b1aece12.tar.gz | |
Merged bug24461 into default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 64 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 89 | ||||
| -rw-r--r-- | src/vm_memory_monitor.erl | 54 |
11 files changed, 146 insertions, 128 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 47bc443303..e98ca9be33 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -249,7 +249,13 @@ status() -> {running_applications, application:which_applications(infinity)}, {os, os:type()}, {erlang_version, erlang:system_info(system_version)}, - {memory, erlang:memory()}]. + {memory, erlang:memory()}] ++ + rabbit_misc:filter_exit_map( + fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end, + [{vm_memory_high_watermark, {vm_memory_monitor, + get_vm_memory_high_watermark, []}}, + {vm_memory_limit, {vm_memory_monitor, + get_memory_limit, []}}]). is_running() -> is_running(node()). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 706aa73bc2..883e570ad6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -34,7 +34,7 @@ -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, limiter, tx_status, next_tag, - unacked_message_q, uncommitted_message_q, uncommitted_ack_q, + unacked_message_q, uncommitted_message_q, uncommitted_acks, user, virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, @@ -184,7 +184,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, next_tag = 1, unacked_message_q = queue:new(), uncommitted_message_q = queue:new(), - uncommitted_ack_q = queue:new(), + uncommitted_acks = [], user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, @@ -667,15 +667,14 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, - _, State = #ch{unacked_message_q = UAMQ, - tx_status = TxStatus}) -> + _, State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, {noreply, case TxStatus of none -> ack(Acked, State1); - in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, Acked), - State1#ch{uncommitted_ack_q = NewTAQ} + in_progress -> State1#ch{uncommitted_acks = + Acked ++ State1#ch.uncommitted_acks} end}; handle_method(#'basic.get'{queue = QueueNameBin, @@ -837,6 +836,7 @@ handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> OkFun = fun () -> ok end, + UAMQL = queue:to_list(UAMQ), ok = fold_per_queue( fun (QPid, MsgIds, ok) -> rabbit_misc:with_exit_handler( @@ -844,8 +844,8 @@ handle_method(#'basic.recover_async'{requeue = true}, rabbit_amqqueue:requeue( QPid, MsgIds, self()) end) - end, ok, UAMQ), - ok = notify_limiter(Limiter, UAMQ), + end, ok, UAMQL), + ok = notify_limiter(Limiter, UAMQL), %% No answer required - basic.recover is the newer, synchronous %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; @@ -1069,8 +1069,8 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> precondition_failed, "channel is not transactional", []); handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, - uncommitted_ack_q = TAQ}) -> - State1 = new_tx(ack(TAQ, rabbit_misc:queue_fold(fun deliver_to_queues/2, + uncommitted_acks = TAL}) -> + State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ))), {noreply, maybe_complete_tx(State1#ch{tx_status = committing})}; @@ -1078,10 +1078,11 @@ handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); -handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, - uncommitted_ack_q = TAQ}) -> - {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = - queue:join(TAQ, UAMQ)})}; +handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, + uncommitted_acks = TAL}) -> + TAQ = queue:from_list(lists:reverse(TAL)), + {reply, #'tx.rollback_ok'{}, + new_tx(State#ch{unacked_message_q = queue:join(TAQ, UAMQ)})}; handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> rabbit_misc:protocol_error( @@ -1280,18 +1281,18 @@ ack_record(DeliveryTag, ConsumerTag, {DeliveryTag, ConsumerTag, {QPid, MsgId}}. collect_acks(Q, 0, true) -> - {Q, queue:new()}; + {queue:to_list(Q), queue:new()}; collect_acks(Q, DeliveryTag, Multiple) -> - collect_acks(queue:new(), queue:new(), Q, DeliveryTag, Multiple). + collect_acks([], queue:new(), Q, DeliveryTag, Multiple). collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> case queue:out(Q) of {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}}, QTail} -> if CurrentDeliveryTag == DeliveryTag -> - {queue:in(UnackedMsg, ToAcc), queue:join(PrefixAcc, QTail)}; + {[UnackedMsg | ToAcc], queue:join(PrefixAcc, QTail)}; Multiple -> - collect_acks(queue:in(UnackedMsg, ToAcc), PrefixAcc, + collect_acks([UnackedMsg | ToAcc], PrefixAcc, QTail, DeliveryTag, Multiple); true -> collect_acks(ToAcc, queue:in(UnackedMsg, PrefixAcc), @@ -1312,7 +1313,7 @@ ack(Acked, State) -> maybe_incr_stats(QIncs, ack, State). new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), - uncommitted_ack_q = queue:new()}. + uncommitted_acks = []}. notify_queues(State = #ch{state = closing}) -> {ok, State}; @@ -1320,12 +1321,15 @@ notify_queues(State = #ch{consumer_mapping = Consumers}) -> {rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()), State#ch{state = closing}}. -fold_per_queue(F, Acc0, UAQ) -> - T = rabbit_misc:queue_fold( - fun ({_DTag, _CTag, {QPid, MsgId}}, T) -> - rabbit_misc:gb_trees_cons(QPid, MsgId, T) - end, gb_trees:empty(), UAQ), - rabbit_misc:gb_trees_fold(F, Acc0, T). +fold_per_queue(_F, Acc, []) -> + Acc; +fold_per_queue(F, Acc, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case + F(QPid, [MsgId], Acc); +fold_per_queue(F, Acc, UAL) -> + T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) -> + rabbit_misc:gb_trees_cons(QPid, MsgId, T) + end, gb_trees:empty(), UAL), + rabbit_misc:gb_trees_fold(F, Acc, T). enable_limiter(State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> @@ -1347,9 +1351,9 @@ consumer_queues(Consumers) -> notify_limiter(Limiter, Acked) -> case rabbit_limiter:is_enabled(Limiter) of false -> ok; - true -> case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 - end, 0, Acked) of + true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, Acked) of 0 -> ok; Count -> rabbit_limiter:ack(Limiter, Count) end @@ -1491,8 +1495,8 @@ i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) -> queue:len(TMQ); -i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) -> - queue:len(TAQ); +i(acks_uncommitted, #ch{uncommitted_acks = TAL}) -> + length(TAL); i(prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_limit(Limiter); i(client_flow_blocked, #ch{limiter = Limiter}) -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 1163ae9d86..e9f0cf6c54 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -91,6 +91,9 @@ start() -> print_error("invalid command '~s'", [string:join([atom_to_list(Command) | Args], " ")]), usage(); + {'EXIT', {badarg, _}} -> + print_error("invalid parameter: ~p", [Args]), + usage(); {error, Reason} -> print_error("~p", [Reason]), quit(2); @@ -321,6 +324,11 @@ action(trace_off, Node, [], Opts, Inform) -> Inform("Stopping tracing for vhost ~p", [VHost]), rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]); +action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) -> + Frac = list_to_float(Arg), + Inform("Setting memory threshhold on ~p to ~p", [Node, Frac]), + rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]); + action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 24468a01fe..8a08d4b673 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -251,6 +251,7 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> end, {[], Queues}, Queues), case length(QList) of 0 -> ok; + 1 -> ok = rabbit_amqqueue:unblock(hd(QList), ChPid); %% common case L -> %% We randomly vary the position of queues in the list, %% thus ensuring that each queue has an equal chance of diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 996b0a980f..3bd8eeefdd 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -36,7 +36,6 @@ queue_durations, %% ets #process queue_duration_sum, %% sum of all queue_durations queue_duration_count, %% number of elements in sum - memory_limit, %% how much memory we intend to use desired_duration %% the desired queue duration }). @@ -63,9 +62,6 @@ -define(SUM_INC_THRESHOLD, 0.95). -define(SUM_INC_AMOUNT, 1.0). -%% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use. --define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824). - -define(EPSILON, 0.000001). %% less than this and we clamp to 0 %%---------------------------------------------------------------------------- @@ -110,13 +106,6 @@ stop() -> %%---------------------------------------------------------------------------- init([]) -> - MemoryLimit = trunc(?MEMORY_LIMIT_SCALING * - (try - vm_memory_monitor:get_memory_limit() - catch - exit:{noproc, _} -> ?MEMORY_SIZE_FOR_DISABLED_VMM - end)), - {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL, ?SERVER, update, []), @@ -127,7 +116,6 @@ init([]) -> queue_durations = Ets, queue_duration_sum = 0.0, queue_duration_count = 0, - memory_limit = MemoryLimit, desired_duration = infinity })}. handle_call({report_ram_duration, Pid, QueueDuration}, From, @@ -223,12 +211,12 @@ internal_deregister(Pid, Demonitor, queue_duration_count = Count1 } end. -internal_update(State = #state { memory_limit = Limit, - queue_durations = Durations, +internal_update(State = #state { queue_durations = Durations, desired_duration = DesiredDurationAvg, queue_duration_sum = Sum, queue_duration_count = Count }) -> - MemoryRatio = erlang:memory(total) / Limit, + MemoryLimit = ?MEMORY_LIMIT_SCALING * vm_memory_monitor:get_memory_limit(), + MemoryRatio = erlang:memory(total) / MemoryLimit, DesiredDurationAvg1 = case MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 of true -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 5fc6341f50..328fe639f7 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -95,7 +95,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, (case MNodes of all -> rabbit_mnesia:all_clustered_nodes(); undefined -> []; - _ -> [list_to_atom(binary_to_list(Node)) || Node <- MNodes] + _ -> MNodes end) -- [node()], [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index b1cf45e7fc..13a553f124 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -257,8 +257,15 @@ val({Type, Value}) -> end, lists:flatten(io_lib:format(Fmt, [Value, Type])). -dirty_read(ReadSpec) -> - case mnesia:dirty_read(ReadSpec) of +%% Normally we'd call mnesia:dirty_read/1 here, but that is quite +%% expensive due to general mnesia overheads (figuring out table types +%% and locations, etc). We get away with bypassing these because we +%% know that the tables we are looking at here +%% - are not the schema table +%% - have a local ram copy +%% - do not have any indices +dirty_read({Table, Key}) -> + case ets:lookup(Table, Key) of [Result] -> {ok, Result}; [] -> {error, not_found} end. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index d453a8707e..e9c4479a63 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -107,9 +107,11 @@ check_delivery(true, _ , {false, []}) -> {unroutable, []}; check_delivery(_ , true, {_ , []}) -> {not_delivered, []}; check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. +%% Normally we'd call mnesia:dirty_read/1 here, but that is quite +%% expensive for the reasons explained in rabbit_misc:dirty_read/1. lookup_qpids(QNames) -> lists:foldl(fun (QName, QPids) -> - case mnesia:dirty_read({rabbit_queue, QName}) of + case ets:lookup(rabbit_queue, QName) of [#amqqueue{pid = QPid, slave_pids = SPids}] -> [QPid | SPids ++ QPids]; [] -> @@ -118,16 +120,8 @@ lookup_qpids(QNames) -> end, [], QNames). %% Normally we'd call mnesia:dirty_select/2 here, but that is quite -%% expensive due to -%% -%% 1) general mnesia overheads (figuring out table types and -%% locations, etc). We get away with bypassing these because we know -%% that the table -%% - is not the schema table -%% - has a local ram copy -%% - does not have any indices -%% -%% 2) 'fixing' of the table with ets:safe_fixtable/2, which is wholly +%% expensive for the same reasons as above, and, additionally, due to +%% mnesia 'fixing' the table with ets:safe_fixtable/2, which is wholly %% unnecessary. According to the ets docs (and the code in erl_db.c), %% 'select' is safe anyway ("Functions that internally traverse over a %% table, like select and match, will give the same guarantee as diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 010279302e..39f67ced2d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1251,6 +1251,9 @@ test_server_status() -> %% list consumers ok = control_action(list_consumers, []), + %% set vm memory high watermark + ok = control_action(set_vm_memory_high_watermark, ["1.0"]), + %% cleanup [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]], diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d4f51f8d4a..b853d98304 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -321,7 +321,7 @@ q3 :: bpqueue:bpqueue(), q4 :: queue(), next_seq_id :: seq_id(), - pending_ack :: dict(), + pending_ack :: gb_tree(), ram_ack_index :: gb_tree(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, @@ -494,9 +494,31 @@ purge(State = #vqstate { q4 = Q4, ram_index_count = 0, persistent_count = PCount1 })}. -publish(Msg, MsgProps, _ChPid, State) -> - {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), - a(reduce_memory_use(State1)). +publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, + MsgProps = #message_properties { needs_confirming = NeedsConfirming }, + _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, + next_seq_id = SeqId, + len = Len, + in_counter = InCount, + persistent_count = PCount, + durable = IsDurable, + ram_msg_count = RamMsgCount, + unconfirmed = UC }) -> + IsPersistent1 = IsDurable andalso IsPersistent, + MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps), + {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), + State2 = case bpqueue:is_empty(Q3) of + false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; + true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } + end, + PCount1 = PCount + one_if(IsPersistent1), + UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), + a(reduce_memory_use(State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + ram_msg_count = RamMsgCount + 1, + unconfirmed = UC1 })). publish_delivered(false, #basic_message { id = MsgId }, #message_properties { needs_confirming = NeedsConfirming }, @@ -533,7 +555,11 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, unconfirmed = UC1 }))}. drain_confirmed(State = #vqstate { confirmed = C }) -> - {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}. + case gb_sets:is_empty(C) of + true -> {[], State}; %% common case + false -> {gb_sets:to_list(C), State #vqstate { + confirmed = gb_sets:new() }} + end. dropwhile(Pred, State) -> case queue_out(State) of @@ -727,7 +753,7 @@ status(#vqstate { {q3 , bpqueue:len(Q3)}, {q4 , queue:len(Q4)}, {len , Len}, - {pending_acks , dict:size(PA)}, + {pending_acks , gb_trees:size(PA)}, {target_ram_count , TargetRamCount}, {ram_msg_count , RamMsgCount}, {ram_ack_count , gb_trees:size(RAI)}, @@ -864,7 +890,7 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) -> true -> {Filtered1, cons_if(not IsDelivered, SeqId, Delivers1), [SeqId | Acks1]}; - false -> case dict:is_key(SeqId, PA) of + false -> case gb_trees:is_defined(SeqId, PA) of false -> {[m(#msg_status { seq_id = SeqId, msg_id = MsgId, @@ -941,7 +967,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, q3 = bpqueue:new(), q4 = queue:new(), next_seq_id = NextSeqId, - pending_ack = dict:new(), + pending_ack = gb_trees:empty(), ram_ack_index = gb_trees:empty(), index_state = IndexState1, msg_store_clients = {PersistentClient, TransientClient}, @@ -1113,34 +1139,6 @@ sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) -> %% Internal gubbins for publishing %%---------------------------------------------------------------------------- -publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, - MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - IsDelivered, MsgOnDisk, - State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, - next_seq_id = SeqId, - len = Len, - in_counter = InCount, - persistent_count = PCount, - durable = IsDurable, - ram_msg_count = RamMsgCount, - unconfirmed = UC }) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) - #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, - {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - State2 = case bpqueue:is_empty(Q3) of - false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; - true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } - end, - PCount1 = PCount + one_if(IsPersistent1), - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - ram_msg_count = RamMsgCount + 1, - unconfirmed = UC1 }}. - maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, _MSCState) -> MsgStatus; @@ -1199,15 +1197,14 @@ record_pending_ack(#msg_status { seq_id = SeqId, true -> {m(trim_msg_status(MsgStatus)), RAI}; false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)} end, - PA1 = dict:store(SeqId, AckEntry, PA), - State #vqstate { pending_ack = PA1, + State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA), ram_ack_index = RAI1, ack_in_counter = AckInCount + 1}. remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA, ram_ack_index = RAI }) -> - {dict:fetch(SeqId, PA), - State #vqstate { pending_ack = dict:erase(SeqId, PA), + {gb_trees:get(SeqId, PA), + State #vqstate { pending_ack = gb_trees:delete(SeqId, PA), ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}. purge_pending_ack(KeepPersistent, @@ -1215,10 +1212,10 @@ purge_pending_ack(KeepPersistent, index_state = IndexState, msg_store_clients = MSCState }) -> {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = - dict:fold(fun (_SeqId, MsgStatus, Acc) -> - accumulate_ack(MsgStatus, Acc) - end, accumulate_ack_init(), PA), - State1 = State #vqstate { pending_ack = dict:new(), + rabbit_misc:gb_trees_fold(fun (_SeqId, MsgStatus, Acc) -> + accumulate_ack(MsgStatus, Acc) + end, accumulate_ack_init(), PA), + State1 = State #vqstate { pending_ack = gb_trees:empty(), ram_ack_index = gb_trees:empty() }, case KeepPersistent of true -> case orddict:find(false, MsgIdsByStore) of @@ -1513,10 +1510,10 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, false -> {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI), MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} = - dict:fetch(SeqId, PA), + gb_trees:get(SeqId, PA), {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), - PA1 = dict:store(SeqId, m(trim_msg_status(MsgStatus1)), PA), + PA1 = gb_trees:update(SeqId, m(trim_msg_status(MsgStatus1)), PA), limit_ram_acks(Quota - 1, State1 #vqstate { pending_ack = PA1, ram_ack_index = RAI1 }) diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index a54bf996f4..35ee1e5165 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -40,6 +40,7 @@ -define(SERVER, ?MODULE). -define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000). +-define(ONE_MB, 1048576). %% For an unknown OS, we assume that we have 1GB of memory. It'll be %% wrong. Scale by vm_memory_high_watermark in configuration to get a @@ -106,35 +107,20 @@ start_link(Args) -> gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). init([MemFraction]) -> - TotalMemory = - case get_total_memory() of - unknown -> - error_logger:warning_msg( - "Unknown total memory size for your OS ~p. " - "Assuming memory size is ~pMB.~n", - [os:type(), trunc(?MEMORY_SIZE_FOR_UNKNOWN_OS/1048576)]), - ?MEMORY_SIZE_FOR_UNKNOWN_OS; - M -> M - end, - MemLimit = get_mem_limit(MemFraction, TotalMemory), - error_logger:info_msg("Memory limit set to ~pMB.~n", - [trunc(MemLimit/1048576)]), TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), - State = #state { total_memory = TotalMemory, - memory_limit = MemLimit, - timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, + State = #state { timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, timer = TRef, alarmed = false}, - {ok, internal_update(State)}. + {ok, set_mem_limits(State, MemFraction)}. handle_call(get_vm_memory_high_watermark, _From, State) -> {reply, State#state.memory_limit / State#state.total_memory, State}; handle_call({set_vm_memory_high_watermark, MemFraction}, _From, State) -> - MemLimit = get_mem_limit(MemFraction, State#state.total_memory), + State1 = set_mem_limits(State, MemFraction), error_logger:info_msg("Memory alarm changed to ~p, ~p bytes.~n", - [MemFraction, MemLimit]), - {reply, ok, State#state{memory_limit = MemLimit}}; + [MemFraction, State1#state.memory_limit]), + {reply, ok, State1}; handle_call(get_check_interval, _From, State) -> {reply, State#state.timeout, State}; @@ -168,6 +154,30 @@ code_change(_OldVsn, State, _Extra) -> %% Server Internals %%---------------------------------------------------------------------------- +set_mem_limits(State, MemFraction) -> + TotalMemory = + case get_total_memory() of + unknown -> + case State of + #state { total_memory = undefined, + memory_limit = undefined } -> + error_logger:warning_msg( + "Unknown total memory size for your OS ~p. " + "Assuming memory size is ~pMB.~n", + [os:type(), + trunc(?MEMORY_SIZE_FOR_UNKNOWN_OS/?ONE_MB)]); + _ -> + ok + end, + ?MEMORY_SIZE_FOR_UNKNOWN_OS; + M -> M + end, + MemLim = get_mem_limit(MemFraction, TotalMemory), + error_logger:info_msg("Memory limit set to ~pMB of ~pMB total.~n", + [trunc(MemLim/?ONE_MB), trunc(TotalMemory/?ONE_MB)]), + internal_update(State #state { total_memory = TotalMemory, + memory_limit = MemLim }). + internal_update(State = #state { memory_limit = MemLimit, alarmed = Alarmed}) -> MemUsed = erlang:memory(total), @@ -322,9 +332,9 @@ parse_line_sunos(Line) -> [Value1 | UnitsRest] = string:tokens(RHS, " "), Value2 = case UnitsRest of ["Gigabytes"] -> - list_to_integer(Value1) * 1024 * 1024 * 1024; + list_to_integer(Value1) * ?ONE_MB * 1024; ["Megabytes"] -> - list_to_integer(Value1) * 1024 * 1024; + list_to_integer(Value1) * ?ONE_MB; ["Kilobytes"] -> list_to_integer(Value1) * 1024; _ -> |
