summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-22 13:32:44 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-22 13:32:44 +0100
commit790e6cf6e736a95f89e13ebeb5f7659114d3359c (patch)
tree3c62c15ecc3a43ce3df21b4e413fdb17037850ad
parent64e4ba35b4db99cd992c728741448a7a24bbb958 (diff)
downloadrabbitmq-server-git-790e6cf6e736a95f89e13ebeb5f7659114d3359c.tar.gz
Switched to tracking memory size of the queue at all times. Removed use of process_info(memory,self()) for reasons outlined in the bug comments. The annoying thing about using a 10% change as the threshold is that it means you get many many more updates when the queue is empty because the % change is much greater.
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_mixed_queue.erl146
2 files changed, 77 insertions, 74 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b19ff7a014..0eff9e1b3a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -540,9 +540,8 @@ i(Item, _) ->
report_memory(State = #q { old_memory_report = OldMem,
mixed_state = MS }) ->
- MSize = rabbit_mixed_queue:estimate_extra_memory(MS),
- {memory, PSize} = process_info(self(), memory),
- NewMem = case MSize + PSize of
+ MSize = rabbit_mixed_queue:estimate_queue_memory(MS),
+ NewMem = case MSize of
0 -> 1; %% avoid / 0
N -> N
end,
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index e0f9d2f226..5c00b38005 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -39,7 +39,7 @@
tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1,
length/1, is_empty/1, delete_queue/1]).
--export([to_disk_only_mode/2, to_mixed_mode/2, estimate_extra_memory/1]).
+-export([to_disk_only_mode/2, to_mixed_mode/2, estimate_queue_memory/1]).
-record(mqstate, { mode,
msg_buf,
@@ -85,7 +85,7 @@
-spec(to_disk_only_mode/2 :: ([message()], mqstate()) -> okmqs()).
-spec(to_mixed_mode/2 :: ([message()], mqstate()) -> okmqs()).
--spec(estimate_extra_memory/1 :: (mqstate()) -> non_neg_integer).
+-spec(estimate_queue_memory/1 :: (mqstate()) -> non_neg_integer).
-endif.
@@ -116,28 +116,25 @@ to_disk_only_mode(TxnMessages, State =
%% Note we also batch together messages on disk so that we minimise
%% the calls to requeue.
Msgs = queue:to_list(MsgBuf),
- {Requeue, Size} =
+ Requeue =
lists:foldl(
fun ({Msg = #basic_message { guid = MsgId }, IsDelivered, OnDisk},
- {RQueueAcc, SizeAcc}) ->
- SizeAcc1 = SizeAcc + size_of_message(Msg),
- RQueueAcc1 =
- if OnDisk ->
- {MsgId, IsDelivered, AckTag, _PersistRemaining} =
- rabbit_disk_queue:phantom_deliver(Q),
- [ {AckTag, {next, IsDelivered}} | RQueueAcc ];
- true ->
- ok = if [] == RQueueAcc -> ok;
- true ->
- rabbit_disk_queue:requeue_with_seqs(
- Q, lists:reverse(RQueueAcc))
- end,
- ok = rabbit_disk_queue:publish(
- Q, Msg, false),
- []
- end,
- {RQueueAcc1, SizeAcc1}
- end, {[], 0}, Msgs),
+ RQueueAcc) ->
+ if OnDisk ->
+ {MsgId, IsDelivered, AckTag, _PersistRemaining} =
+ rabbit_disk_queue:phantom_deliver(Q),
+ [ {AckTag, {next, IsDelivered}} | RQueueAcc ];
+ true ->
+ ok = if [] == RQueueAcc -> ok;
+ true ->
+ rabbit_disk_queue:requeue_with_seqs(
+ Q, lists:reverse(RQueueAcc))
+ end,
+ ok = rabbit_disk_queue:publish(
+ Q, Msg, false),
+ []
+ end
+ end, [], Msgs),
ok = if [] == Requeue -> ok;
true ->
rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue))
@@ -153,8 +150,7 @@ to_disk_only_mode(TxnMessages, State =
_ -> rabbit_disk_queue:tx_publish(Msg)
end
end, TxnMessages),
- {ok,
- State #mqstate { mode = disk, msg_buf = queue:new(), memory_size = Size }}.
+ {ok, State #mqstate { mode = disk, msg_buf = queue:new() }}.
to_mixed_mode(_TxnMessages, State = #mqstate { mode = mixed }) ->
{ok, State};
@@ -184,7 +180,7 @@ to_mixed_mode(TxnMessages, State =
end
end, [], TxnMessages),
ok = rabbit_disk_queue:tx_cancel(lists:reverse(Cancel)),
- {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1, memory_size = 0 }}.
+ {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1 }}.
purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q,
is_durable = IsDurable }) ->
@@ -223,16 +219,17 @@ publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length,
ok = rabbit_disk_queue:publish(Q, Msg, false),
Size1 = Size + size_of_message(Msg),
{ok, State #mqstate { length = Length + 1, memory_size = Size1 }};
-publish(Msg = #basic_message { is_persistent = IsPersistent },
- State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable,
- msg_buf = MsgBuf, length = Length }) ->
+publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
+ #mqstate { queue = Q, mode = mixed, is_durable = IsDurable,
+ msg_buf = MsgBuf, length = Length, memory_size = Size }) ->
OnDisk = IsDurable andalso IsPersistent,
ok = if OnDisk ->
rabbit_disk_queue:publish(Q, Msg, false);
true -> ok
end,
+ Size1 = Size + size_of_message(Msg),
{ok, State #mqstate { msg_buf = queue:in({Msg, false, OnDisk}, MsgBuf),
- length = Length + 1 }}.
+ length = Length + 1, memory_size = Size1 }}.
%% Assumption here is that the queue is empty already (only called via
%% attempt_immediate_delivery).
@@ -264,15 +261,16 @@ deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable,
{Msg = #basic_message { is_persistent = IsPersistent },
_Size, IsDelivered, AckTag, Remaining}
= rabbit_disk_queue:deliver(Q),
- Size = size_of_message(Msg),
+ QSize1 = QSize - size_of_message(Msg),
AckTag1 = if IsPersistent andalso IsDurable -> AckTag;
true -> ok = rabbit_disk_queue:ack(Q, [AckTag]),
noack
end,
{{Msg, IsDelivered, AckTag1, Remaining},
- State #mqstate { length = Length - 1, memory_size = QSize - Size }};
-deliver(State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable,
- msg_buf = MsgBuf, length = Length }) ->
+ State #mqstate { length = Length - 1, memory_size = QSize1 }};
+deliver(State =
+ #mqstate { mode = mixed, msg_buf = MsgBuf, is_durable = IsDurable,
+ queue = Q, length = Length, memory_size = QSize }) ->
{{value, {Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
IsDelivered, OnDisk}}, MsgBuf1}
@@ -290,8 +288,9 @@ deliver(State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable,
true -> noack
end,
Rem = Length - 1,
+ QSize1 = QSize - size_of_message(Msg),
{{Msg, IsDelivered, AckTag, Rem},
- State #mqstate { msg_buf = MsgBuf1, length = Rem }}.
+ State #mqstate { msg_buf = MsgBuf1, length = Rem, memory_size = QSize1 }}.
remove_noacks(Acks) ->
lists:filter(fun (A) -> A /= noack end, Acks).
@@ -303,17 +302,18 @@ ack(Acks, State = #mqstate { queue = Q }) ->
{ok, State}
end.
-tx_publish(Msg, State = #mqstate { mode = disk, memory_size = Size }) ->
+tx_publish(Msg, State = #mqstate { mode = disk, memory_size = QSize }) ->
ok = rabbit_disk_queue:tx_publish(Msg),
- {ok, State #mqstate { memory_size = Size + size_of_message(Msg) }};
-tx_publish(Msg = #basic_message { is_persistent = IsPersistent },
- State = #mqstate { mode = mixed, is_durable = IsDurable })
+ {ok, State #mqstate { memory_size = QSize + size_of_message(Msg) }};
+tx_publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
+ #mqstate { mode = mixed, is_durable = IsDurable,
+ memory_size = QSize })
when IsDurable andalso IsPersistent ->
ok = rabbit_disk_queue:tx_publish(Msg),
- {ok, State};
-tx_publish(_Msg, State = #mqstate { mode = mixed }) ->
+ {ok, State #mqstate { memory_size = QSize + size_of_message(Msg) }};
+tx_publish(Msg, State = #mqstate { mode = mixed, memory_size = QSize }) ->
%% this message will reappear in the tx_commit, so ignore for now
- {ok, State}.
+ {ok, State #mqstate { memory_size = QSize + size_of_message(Msg) }}.
only_msg_ids(Pubs) ->
lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs).
@@ -353,37 +353,38 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
{ok, State #mqstate { msg_buf = MsgBuf1,
length = Length + erlang:length(Publishes) }}.
-only_persistent_msg_ids(Pubs) ->
- lists:reverse(
- lists:foldl(
- fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) ->
- if IsPersistent -> [Msg #basic_message.guid | Acc];
- true -> Acc
- end
- end, [], Pubs)).
-
-tx_cancel(Publishes, State = #mqstate { mode = disk, memory_size = TSize }) ->
+tx_cancel(Publishes, State = #mqstate { mode = disk, memory_size = QSize }) ->
{MsgIds, CSize} =
lists:foldl(
fun (Msg = #basic_message { guid = MsgId }, {MsgIdsAcc, CSizeAcc}) ->
{[MsgId | MsgIdsAcc], CSizeAcc + size_of_message(Msg)}
end, {[], 0}, Publishes),
ok = rabbit_disk_queue:tx_cancel(lists:reverse(MsgIds)),
- {ok, State #mqstate { memory_size = TSize - CSize }};
-tx_cancel(Publishes,
- State = #mqstate { mode = mixed, is_durable = IsDurable }) ->
+ {ok, State #mqstate { memory_size = QSize - CSize }};
+tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable,
+ memory_size = QSize }) ->
+ {PersistentPubs, CSize} =
+ lists:foldl(
+ fun (Msg = #basic_message { is_persistent = IsPersistent,
+ guid = MsgId }, {Acc, CSizeAcc}) ->
+ CSizeAcc1 = CSizeAcc + size_of_message(Msg),
+ {case IsPersistent of
+ true -> [MsgId | Acc];
+ _ -> Acc
+ end, CSizeAcc1}
+ end, {[], 0}, Publishes),
ok =
if IsDurable ->
- rabbit_disk_queue:tx_cancel(only_persistent_msg_ids(Publishes));
+ rabbit_disk_queue:tx_cancel(lists:reverse(PersistentPubs));
true -> ok
end,
- {ok, State}.
+ {ok, State #mqstate { memory_size = QSize - CSize }}.
%% [{Msg, AckTag}]
requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
is_durable = IsDurable,
length = Length,
- memory_size = TSize
+ memory_size = QSize
}) ->
%% here, we may have messages with no ack tags, because of the
%% fact they are not persistent, but nevertheless we want to
@@ -391,42 +392,44 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
{Requeue, CSize}
= lists:foldl(
fun ({Msg = #basic_message { is_persistent = IsPersistent },
- AckTag}, {RQ, SizeAcc})
+ AckTag}, {RQ, CSizeAcc})
when IsPersistent andalso IsDurable ->
- {[AckTag | RQ], SizeAcc + size_of_message(Msg)};
- ({Msg, _AckTag}, {RQ, SizeAcc}) ->
+ {[AckTag | RQ], CSizeAcc + size_of_message(Msg)};
+ ({Msg, _AckTag}, {RQ, CSizeAcc}) ->
ok = if RQ == [] -> ok;
true -> rabbit_disk_queue:requeue(
Q, lists:reverse(RQ))
end,
_AckTag1 = rabbit_disk_queue:publish(
Q, Msg, true),
- {[], SizeAcc + size_of_message(Msg)}
+ {[], CSizeAcc + size_of_message(Msg)}
end, {[], 0}, MessagesWithAckTags),
ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)),
{ok, State #mqstate { length = Length + erlang:length(MessagesWithAckTags),
- memory_size = TSize + CSize
+ memory_size = QSize + CSize
}};
requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
msg_buf = MsgBuf,
is_durable = IsDurable,
- length = Length
+ length = Length,
+ memory_size = QSize
}) ->
- {PersistentPubs, MsgBuf1} =
+ {PersistentPubs, MsgBuf1, CSize} =
lists:foldl(
fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag},
- {Acc, MsgBuf2}) ->
+ {Acc, MsgBuf2, CSizeAcc}) ->
OnDisk = IsDurable andalso IsPersistent,
Acc1 =
if OnDisk -> [AckTag | Acc];
true -> Acc
end,
- {Acc1, queue:in({Msg, true, OnDisk}, MsgBuf2)}
- end, {[], MsgBuf}, MessagesWithAckTags),
+ CSizeAcc1 = CSizeAcc + size_of_message(Msg),
+ {Acc1, queue:in({Msg, true, OnDisk}, MsgBuf2), CSizeAcc1}
+ end, {[], MsgBuf, 0}, MessagesWithAckTags),
ok = if [] == PersistentPubs -> ok;
true -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs))
end,
- {ok, State #mqstate {msg_buf = MsgBuf1,
+ {ok, State #mqstate {msg_buf = MsgBuf1, memory_size = QSize + CSize,
length = Length + erlang:length(MessagesWithAckTags)}}.
purge(State = #mqstate { queue = Q, mode = disk, length = Count }) ->
@@ -434,14 +437,15 @@ purge(State = #mqstate { queue = Q, mode = disk, length = Count }) ->
{Count, State #mqstate { length = 0, memory_size = 0 }};
purge(State = #mqstate { queue = Q, mode = mixed, length = Length }) ->
rabbit_disk_queue:purge(Q),
- {Length, State #mqstate { msg_buf = queue:new(), length = 0 }}.
+ {Length,
+ State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0 }}.
delete_queue(State = #mqstate { queue = Q, mode = disk }) ->
rabbit_disk_queue:delete_queue(Q),
{ok, State #mqstate { length = 0, memory_size = 0 }};
delete_queue(State = #mqstate { queue = Q, mode = mixed }) ->
rabbit_disk_queue:delete_queue(Q),
- {ok, State #mqstate { msg_buf = queue:new(), length = 0 }}.
+ {ok, State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0 }}.
length(#mqstate { length = Length }) ->
Length.
@@ -449,5 +453,5 @@ length(#mqstate { length = Length }) ->
is_empty(#mqstate { length = Length }) ->
0 == Length.
-estimate_extra_memory(#mqstate { memory_size = Size }) ->
+estimate_queue_memory(#mqstate { memory_size = Size }) ->
2 * Size. %% Magic number. Will probably need playing with.