summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-10-06 16:12:13 +0100
committerEmile Joubert <emile@rabbitmq.com>2011-10-06 16:12:13 +0100
commitf88351e1078750fcfc5f8b8a66bda5a3b1aece12 (patch)
tree04aff3de5e70405f7cafdcc283e44a8e80d371b3 /src
parentb1ef2b126a7e59775c28702e3df18be926b33dbe (diff)
parentcb360e2c53d982631201e5c3fc33589528853cfd (diff)
downloadrabbitmq-server-git-f88351e1078750fcfc5f8b8a66bda5a3b1aece12.tar.gz
Merged bug24461 into default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_channel.erl64
-rw-r--r--src/rabbit_control.erl8
-rw-r--r--src/rabbit_limiter.erl1
-rw-r--r--src/rabbit_memory_monitor.erl18
-rw-r--r--src/rabbit_mirror_queue_master.erl2
-rw-r--r--src/rabbit_misc.erl11
-rw-r--r--src/rabbit_router.erl16
-rw-r--r--src/rabbit_tests.erl3
-rw-r--r--src/rabbit_variable_queue.erl89
-rw-r--r--src/vm_memory_monitor.erl54
11 files changed, 146 insertions, 128 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 47bc443303..e98ca9be33 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -249,7 +249,13 @@ status() ->
{running_applications, application:which_applications(infinity)},
{os, os:type()},
{erlang_version, erlang:system_info(system_version)},
- {memory, erlang:memory()}].
+ {memory, erlang:memory()}] ++
+ rabbit_misc:filter_exit_map(
+ fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end,
+ [{vm_memory_high_watermark, {vm_memory_monitor,
+ get_vm_memory_high_watermark, []}},
+ {vm_memory_limit, {vm_memory_monitor,
+ get_memory_limit, []}}]).
is_running() -> is_running(node()).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 706aa73bc2..883e570ad6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -34,7 +34,7 @@
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
limiter, tx_status, next_tag,
- unacked_message_q, uncommitted_message_q, uncommitted_ack_q,
+ unacked_message_q, uncommitted_message_q, uncommitted_acks,
user, virtual_host, most_recently_declared_queue, queue_monitors,
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
@@ -184,7 +184,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
next_tag = 1,
unacked_message_q = queue:new(),
uncommitted_message_q = queue:new(),
- uncommitted_ack_q = queue:new(),
+ uncommitted_acks = [],
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
@@ -667,15 +667,14 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
- _, State = #ch{unacked_message_q = UAMQ,
- tx_status = TxStatus}) ->
+ _, State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
{noreply,
case TxStatus of
none -> ack(Acked, State1);
- in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, Acked),
- State1#ch{uncommitted_ack_q = NewTAQ}
+ in_progress -> State1#ch{uncommitted_acks =
+ Acked ++ State1#ch.uncommitted_acks}
end};
handle_method(#'basic.get'{queue = QueueNameBin,
@@ -837,6 +836,7 @@ handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ,
limiter = Limiter}) ->
OkFun = fun () -> ok end,
+ UAMQL = queue:to_list(UAMQ),
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
rabbit_misc:with_exit_handler(
@@ -844,8 +844,8 @@ handle_method(#'basic.recover_async'{requeue = true},
rabbit_amqqueue:requeue(
QPid, MsgIds, self())
end)
- end, ok, UAMQ),
- ok = notify_limiter(Limiter, UAMQ),
+ end, ok, UAMQL),
+ ok = notify_limiter(Limiter, UAMQL),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
@@ -1069,8 +1069,8 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
precondition_failed, "channel is not transactional", []);
handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
- uncommitted_ack_q = TAQ}) ->
- State1 = new_tx(ack(TAQ, rabbit_misc:queue_fold(fun deliver_to_queues/2,
+ uncommitted_acks = TAL}) ->
+ State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2,
State, TMQ))),
{noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
@@ -1078,10 +1078,11 @@ handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
-handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_ack_q = TAQ}) ->
- {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q =
- queue:join(TAQ, UAMQ)})};
+handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
+ uncommitted_acks = TAL}) ->
+ TAQ = queue:from_list(lists:reverse(TAL)),
+ {reply, #'tx.rollback_ok'{},
+ new_tx(State#ch{unacked_message_q = queue:join(TAQ, UAMQ)})};
handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
rabbit_misc:protocol_error(
@@ -1280,18 +1281,18 @@ ack_record(DeliveryTag, ConsumerTag,
{DeliveryTag, ConsumerTag, {QPid, MsgId}}.
collect_acks(Q, 0, true) ->
- {Q, queue:new()};
+ {queue:to_list(Q), queue:new()};
collect_acks(Q, DeliveryTag, Multiple) ->
- collect_acks(queue:new(), queue:new(), Q, DeliveryTag, Multiple).
+ collect_acks([], queue:new(), Q, DeliveryTag, Multiple).
collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
case queue:out(Q) of
{{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
QTail} ->
if CurrentDeliveryTag == DeliveryTag ->
- {queue:in(UnackedMsg, ToAcc), queue:join(PrefixAcc, QTail)};
+ {[UnackedMsg | ToAcc], queue:join(PrefixAcc, QTail)};
Multiple ->
- collect_acks(queue:in(UnackedMsg, ToAcc), PrefixAcc,
+ collect_acks([UnackedMsg | ToAcc], PrefixAcc,
QTail, DeliveryTag, Multiple);
true ->
collect_acks(ToAcc, queue:in(UnackedMsg, PrefixAcc),
@@ -1312,7 +1313,7 @@ ack(Acked, State) ->
maybe_incr_stats(QIncs, ack, State).
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
- uncommitted_ack_q = queue:new()}.
+ uncommitted_acks = []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1320,12 +1321,15 @@ notify_queues(State = #ch{consumer_mapping = Consumers}) ->
{rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()),
State#ch{state = closing}}.
-fold_per_queue(F, Acc0, UAQ) ->
- T = rabbit_misc:queue_fold(
- fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
- rabbit_misc:gb_trees_cons(QPid, MsgId, T)
- end, gb_trees:empty(), UAQ),
- rabbit_misc:gb_trees_fold(F, Acc0, T).
+fold_per_queue(_F, Acc, []) ->
+ Acc;
+fold_per_queue(F, Acc, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
+ F(QPid, [MsgId], Acc);
+fold_per_queue(F, Acc, UAL) ->
+ T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
+ rabbit_misc:gb_trees_cons(QPid, MsgId, T)
+ end, gb_trees:empty(), UAL),
+ rabbit_misc:gb_trees_fold(F, Acc, T).
enable_limiter(State = #ch{unacked_message_q = UAMQ,
limiter = Limiter}) ->
@@ -1347,9 +1351,9 @@ consumer_queues(Consumers) ->
notify_limiter(Limiter, Acked) ->
case rabbit_limiter:is_enabled(Limiter) of
false -> ok;
- true -> case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
- end, 0, Acked) of
+ true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, Acked) of
0 -> ok;
Count -> rabbit_limiter:ack(Limiter, Count)
end
@@ -1491,8 +1495,8 @@ i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
queue:len(UAMQ);
i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
queue:len(TMQ);
-i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) ->
- queue:len(TAQ);
+i(acks_uncommitted, #ch{uncommitted_acks = TAL}) ->
+ length(TAL);
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 1163ae9d86..e9f0cf6c54 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -91,6 +91,9 @@ start() ->
print_error("invalid command '~s'",
[string:join([atom_to_list(Command) | Args], " ")]),
usage();
+ {'EXIT', {badarg, _}} ->
+ print_error("invalid parameter: ~p", [Args]),
+ usage();
{error, Reason} ->
print_error("~p", [Reason]),
quit(2);
@@ -321,6 +324,11 @@ action(trace_off, Node, [], Opts, Inform) ->
Inform("Stopping tracing for vhost ~p", [VHost]),
rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]);
+action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) ->
+ Frac = list_to_float(Arg),
+ Inform("Setting memory threshhold on ~p to ~p", [Node, Frac]),
+ rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]);
+
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 24468a01fe..8a08d4b673 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -251,6 +251,7 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
end, {[], Queues}, Queues),
case length(QList) of
0 -> ok;
+ 1 -> ok = rabbit_amqqueue:unblock(hd(QList), ChPid); %% common case
L ->
%% We randomly vary the position of queues in the list,
%% thus ensuring that each queue has an equal chance of
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 996b0a980f..3bd8eeefdd 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -36,7 +36,6 @@
queue_durations, %% ets #process
queue_duration_sum, %% sum of all queue_durations
queue_duration_count, %% number of elements in sum
- memory_limit, %% how much memory we intend to use
desired_duration %% the desired queue duration
}).
@@ -63,9 +62,6 @@
-define(SUM_INC_THRESHOLD, 0.95).
-define(SUM_INC_AMOUNT, 1.0).
-%% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use.
--define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824).
-
-define(EPSILON, 0.000001). %% less than this and we clamp to 0
%%----------------------------------------------------------------------------
@@ -110,13 +106,6 @@ stop() ->
%%----------------------------------------------------------------------------
init([]) ->
- MemoryLimit = trunc(?MEMORY_LIMIT_SCALING *
- (try
- vm_memory_monitor:get_memory_limit()
- catch
- exit:{noproc, _} -> ?MEMORY_SIZE_FOR_DISABLED_VMM
- end)),
-
{ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL,
?SERVER, update, []),
@@ -127,7 +116,6 @@ init([]) ->
queue_durations = Ets,
queue_duration_sum = 0.0,
queue_duration_count = 0,
- memory_limit = MemoryLimit,
desired_duration = infinity })}.
handle_call({report_ram_duration, Pid, QueueDuration}, From,
@@ -223,12 +211,12 @@ internal_deregister(Pid, Demonitor,
queue_duration_count = Count1 }
end.
-internal_update(State = #state { memory_limit = Limit,
- queue_durations = Durations,
+internal_update(State = #state { queue_durations = Durations,
desired_duration = DesiredDurationAvg,
queue_duration_sum = Sum,
queue_duration_count = Count }) ->
- MemoryRatio = erlang:memory(total) / Limit,
+ MemoryLimit = ?MEMORY_LIMIT_SCALING * vm_memory_monitor:get_memory_limit(),
+ MemoryRatio = erlang:memory(total) / MemoryLimit,
DesiredDurationAvg1 =
case MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 of
true ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 5fc6341f50..328fe639f7 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -95,7 +95,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
(case MNodes of
all -> rabbit_mnesia:all_clustered_nodes();
undefined -> [];
- _ -> [list_to_atom(binary_to_list(Node)) || Node <- MNodes]
+ _ -> MNodes
end) -- [node()],
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index b1cf45e7fc..13a553f124 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -257,8 +257,15 @@ val({Type, Value}) ->
end,
lists:flatten(io_lib:format(Fmt, [Value, Type])).
-dirty_read(ReadSpec) ->
- case mnesia:dirty_read(ReadSpec) of
+%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
+%% expensive due to general mnesia overheads (figuring out table types
+%% and locations, etc). We get away with bypassing these because we
+%% know that the tables we are looking at here
+%% - are not the schema table
+%% - have a local ram copy
+%% - do not have any indices
+dirty_read({Table, Key}) ->
+ case ets:lookup(Table, Key) of
[Result] -> {ok, Result};
[] -> {error, not_found}
end.
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index d453a8707e..e9c4479a63 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -107,9 +107,11 @@ check_delivery(true, _ , {false, []}) -> {unroutable, []};
check_delivery(_ , true, {_ , []}) -> {not_delivered, []};
check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.
+%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
+%% expensive for the reasons explained in rabbit_misc:dirty_read/1.
lookup_qpids(QNames) ->
lists:foldl(fun (QName, QPids) ->
- case mnesia:dirty_read({rabbit_queue, QName}) of
+ case ets:lookup(rabbit_queue, QName) of
[#amqqueue{pid = QPid, slave_pids = SPids}] ->
[QPid | SPids ++ QPids];
[] ->
@@ -118,16 +120,8 @@ lookup_qpids(QNames) ->
end, [], QNames).
%% Normally we'd call mnesia:dirty_select/2 here, but that is quite
-%% expensive due to
-%%
-%% 1) general mnesia overheads (figuring out table types and
-%% locations, etc). We get away with bypassing these because we know
-%% that the table
-%% - is not the schema table
-%% - has a local ram copy
-%% - does not have any indices
-%%
-%% 2) 'fixing' of the table with ets:safe_fixtable/2, which is wholly
+%% expensive for the same reasons as above, and, additionally, due to
+%% mnesia 'fixing' the table with ets:safe_fixtable/2, which is wholly
%% unnecessary. According to the ets docs (and the code in erl_db.c),
%% 'select' is safe anyway ("Functions that internally traverse over a
%% table, like select and match, will give the same guarantee as
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 010279302e..39f67ced2d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1251,6 +1251,9 @@ test_server_status() ->
%% list consumers
ok = control_action(list_consumers, []),
+ %% set vm memory high watermark
+ ok = control_action(set_vm_memory_high_watermark, ["1.0"]),
+
%% cleanup
[{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]],
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d4f51f8d4a..b853d98304 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -321,7 +321,7 @@
q3 :: bpqueue:bpqueue(),
q4 :: queue(),
next_seq_id :: seq_id(),
- pending_ack :: dict(),
+ pending_ack :: gb_tree(),
ram_ack_index :: gb_tree(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
@@ -494,9 +494,31 @@ purge(State = #vqstate { q4 = Q4,
ram_index_count = 0,
persistent_count = PCount1 })}.
-publish(Msg, MsgProps, _ChPid, State) ->
- {_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
- a(reduce_memory_use(State1)).
+publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
+ MsgProps = #message_properties { needs_confirming = NeedsConfirming },
+ _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ next_seq_id = SeqId,
+ len = Len,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ ram_msg_count = RamMsgCount,
+ unconfirmed = UC }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps),
+ {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
+ State2 = case bpqueue:is_empty(Q3) of
+ false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
+ true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
+ end,
+ PCount1 = PCount + one_if(IsPersistent1),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ a(reduce_memory_use(State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ ram_msg_count = RamMsgCount + 1,
+ unconfirmed = UC1 })).
publish_delivered(false, #basic_message { id = MsgId },
#message_properties { needs_confirming = NeedsConfirming },
@@ -533,7 +555,11 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
unconfirmed = UC1 }))}.
drain_confirmed(State = #vqstate { confirmed = C }) ->
- {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}.
+ case gb_sets:is_empty(C) of
+ true -> {[], State}; %% common case
+ false -> {gb_sets:to_list(C), State #vqstate {
+ confirmed = gb_sets:new() }}
+ end.
dropwhile(Pred, State) ->
case queue_out(State) of
@@ -727,7 +753,7 @@ status(#vqstate {
{q3 , bpqueue:len(Q3)},
{q4 , queue:len(Q4)},
{len , Len},
- {pending_acks , dict:size(PA)},
+ {pending_acks , gb_trees:size(PA)},
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
{ram_ack_count , gb_trees:size(RAI)},
@@ -864,7 +890,7 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
- false -> case dict:is_key(SeqId, PA) of
+ false -> case gb_trees:is_defined(SeqId, PA) of
false -> {[m(#msg_status {
seq_id = SeqId,
msg_id = MsgId,
@@ -941,7 +967,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
q3 = bpqueue:new(),
q4 = queue:new(),
next_seq_id = NextSeqId,
- pending_ack = dict:new(),
+ pending_ack = gb_trees:empty(),
ram_ack_index = gb_trees:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
@@ -1113,34 +1139,6 @@ sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) ->
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
-publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
- MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- IsDelivered, MsgOnDisk,
- State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- next_seq_id = SeqId,
- len = Len,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- ram_msg_count = RamMsgCount,
- unconfirmed = UC }) ->
- IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
- #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk},
- {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
- State2 = case bpqueue:is_empty(Q3) of
- false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
- true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
- end,
- PCount1 = PCount + one_if(IsPersistent1),
- UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- {SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- ram_msg_count = RamMsgCount + 1,
- unconfirmed = UC1 }}.
-
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, _MSCState) ->
MsgStatus;
@@ -1199,15 +1197,14 @@ record_pending_ack(#msg_status { seq_id = SeqId,
true -> {m(trim_msg_status(MsgStatus)), RAI};
false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
end,
- PA1 = dict:store(SeqId, AckEntry, PA),
- State #vqstate { pending_ack = PA1,
+ State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA),
ram_ack_index = RAI1,
ack_in_counter = AckInCount + 1}.
remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA,
ram_ack_index = RAI }) ->
- {dict:fetch(SeqId, PA),
- State #vqstate { pending_ack = dict:erase(SeqId, PA),
+ {gb_trees:get(SeqId, PA),
+ State #vqstate { pending_ack = gb_trees:delete(SeqId, PA),
ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}.
purge_pending_ack(KeepPersistent,
@@ -1215,10 +1212,10 @@ purge_pending_ack(KeepPersistent,
index_state = IndexState,
msg_store_clients = MSCState }) ->
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
- dict:fold(fun (_SeqId, MsgStatus, Acc) ->
- accumulate_ack(MsgStatus, Acc)
- end, accumulate_ack_init(), PA),
- State1 = State #vqstate { pending_ack = dict:new(),
+ rabbit_misc:gb_trees_fold(fun (_SeqId, MsgStatus, Acc) ->
+ accumulate_ack(MsgStatus, Acc)
+ end, accumulate_ack_init(), PA),
+ State1 = State #vqstate { pending_ack = gb_trees:empty(),
ram_ack_index = gb_trees:empty() },
case KeepPersistent of
true -> case orddict:find(false, MsgIdsByStore) of
@@ -1513,10 +1510,10 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
false ->
{SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} =
- dict:fetch(SeqId, PA),
+ gb_trees:get(SeqId, PA),
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
- PA1 = dict:store(SeqId, m(trim_msg_status(MsgStatus1)), PA),
+ PA1 = gb_trees:update(SeqId, m(trim_msg_status(MsgStatus1)), PA),
limit_ram_acks(Quota - 1,
State1 #vqstate { pending_ack = PA1,
ram_ack_index = RAI1 })
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index a54bf996f4..35ee1e5165 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -40,6 +40,7 @@
-define(SERVER, ?MODULE).
-define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000).
+-define(ONE_MB, 1048576).
%% For an unknown OS, we assume that we have 1GB of memory. It'll be
%% wrong. Scale by vm_memory_high_watermark in configuration to get a
@@ -106,35 +107,20 @@ start_link(Args) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
init([MemFraction]) ->
- TotalMemory =
- case get_total_memory() of
- unknown ->
- error_logger:warning_msg(
- "Unknown total memory size for your OS ~p. "
- "Assuming memory size is ~pMB.~n",
- [os:type(), trunc(?MEMORY_SIZE_FOR_UNKNOWN_OS/1048576)]),
- ?MEMORY_SIZE_FOR_UNKNOWN_OS;
- M -> M
- end,
- MemLimit = get_mem_limit(MemFraction, TotalMemory),
- error_logger:info_msg("Memory limit set to ~pMB.~n",
- [trunc(MemLimit/1048576)]),
TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
- State = #state { total_memory = TotalMemory,
- memory_limit = MemLimit,
- timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
+ State = #state { timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
timer = TRef,
alarmed = false},
- {ok, internal_update(State)}.
+ {ok, set_mem_limits(State, MemFraction)}.
handle_call(get_vm_memory_high_watermark, _From, State) ->
{reply, State#state.memory_limit / State#state.total_memory, State};
handle_call({set_vm_memory_high_watermark, MemFraction}, _From, State) ->
- MemLimit = get_mem_limit(MemFraction, State#state.total_memory),
+ State1 = set_mem_limits(State, MemFraction),
error_logger:info_msg("Memory alarm changed to ~p, ~p bytes.~n",
- [MemFraction, MemLimit]),
- {reply, ok, State#state{memory_limit = MemLimit}};
+ [MemFraction, State1#state.memory_limit]),
+ {reply, ok, State1};
handle_call(get_check_interval, _From, State) ->
{reply, State#state.timeout, State};
@@ -168,6 +154,30 @@ code_change(_OldVsn, State, _Extra) ->
%% Server Internals
%%----------------------------------------------------------------------------
+set_mem_limits(State, MemFraction) ->
+ TotalMemory =
+ case get_total_memory() of
+ unknown ->
+ case State of
+ #state { total_memory = undefined,
+ memory_limit = undefined } ->
+ error_logger:warning_msg(
+ "Unknown total memory size for your OS ~p. "
+ "Assuming memory size is ~pMB.~n",
+ [os:type(),
+ trunc(?MEMORY_SIZE_FOR_UNKNOWN_OS/?ONE_MB)]);
+ _ ->
+ ok
+ end,
+ ?MEMORY_SIZE_FOR_UNKNOWN_OS;
+ M -> M
+ end,
+ MemLim = get_mem_limit(MemFraction, TotalMemory),
+ error_logger:info_msg("Memory limit set to ~pMB of ~pMB total.~n",
+ [trunc(MemLim/?ONE_MB), trunc(TotalMemory/?ONE_MB)]),
+ internal_update(State #state { total_memory = TotalMemory,
+ memory_limit = MemLim }).
+
internal_update(State = #state { memory_limit = MemLimit,
alarmed = Alarmed}) ->
MemUsed = erlang:memory(total),
@@ -322,9 +332,9 @@ parse_line_sunos(Line) ->
[Value1 | UnitsRest] = string:tokens(RHS, " "),
Value2 = case UnitsRest of
["Gigabytes"] ->
- list_to_integer(Value1) * 1024 * 1024 * 1024;
+ list_to_integer(Value1) * ?ONE_MB * 1024;
["Megabytes"] ->
- list_to_integer(Value1) * 1024 * 1024;
+ list_to_integer(Value1) * ?ONE_MB;
["Kilobytes"] ->
list_to_integer(Value1) * 1024;
_ ->