diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-22 13:32:44 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-22 13:32:44 +0100 |
| commit | 790e6cf6e736a95f89e13ebeb5f7659114d3359c (patch) | |
| tree | 3c62c15ecc3a43ce3df21b4e413fdb17037850ad | |
| parent | 64e4ba35b4db99cd992c728741448a7a24bbb958 (diff) | |
| download | rabbitmq-server-git-790e6cf6e736a95f89e13ebeb5f7659114d3359c.tar.gz | |
Switched to tracking memory size of the queue at all times. Removed use of process_info(memory,self()) for reasons outlined in the bug comments. The annoying thing about using a 10% change as the threshold is that it means you get many many more updates when the queue is empty because the % change is much greater.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 146 |
2 files changed, 77 insertions, 74 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b19ff7a014..0eff9e1b3a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -540,9 +540,8 @@ i(Item, _) -> report_memory(State = #q { old_memory_report = OldMem, mixed_state = MS }) -> - MSize = rabbit_mixed_queue:estimate_extra_memory(MS), - {memory, PSize} = process_info(self(), memory), - NewMem = case MSize + PSize of + MSize = rabbit_mixed_queue:estimate_queue_memory(MS), + NewMem = case MSize of 0 -> 1; %% avoid / 0 N -> N end, diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index e0f9d2f226..5c00b38005 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -39,7 +39,7 @@ tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1, length/1, is_empty/1, delete_queue/1]). --export([to_disk_only_mode/2, to_mixed_mode/2, estimate_extra_memory/1]). +-export([to_disk_only_mode/2, to_mixed_mode/2, estimate_queue_memory/1]). -record(mqstate, { mode, msg_buf, @@ -85,7 +85,7 @@ -spec(to_disk_only_mode/2 :: ([message()], mqstate()) -> okmqs()). -spec(to_mixed_mode/2 :: ([message()], mqstate()) -> okmqs()). --spec(estimate_extra_memory/1 :: (mqstate()) -> non_neg_integer). +-spec(estimate_queue_memory/1 :: (mqstate()) -> non_neg_integer). -endif. @@ -116,28 +116,25 @@ to_disk_only_mode(TxnMessages, State = %% Note we also batch together messages on disk so that we minimise %% the calls to requeue. Msgs = queue:to_list(MsgBuf), - {Requeue, Size} = + Requeue = lists:foldl( fun ({Msg = #basic_message { guid = MsgId }, IsDelivered, OnDisk}, - {RQueueAcc, SizeAcc}) -> - SizeAcc1 = SizeAcc + size_of_message(Msg), - RQueueAcc1 = - if OnDisk -> - {MsgId, IsDelivered, AckTag, _PersistRemaining} = - rabbit_disk_queue:phantom_deliver(Q), - [ {AckTag, {next, IsDelivered}} | RQueueAcc ]; - true -> - ok = if [] == RQueueAcc -> ok; - true -> - rabbit_disk_queue:requeue_with_seqs( - Q, lists:reverse(RQueueAcc)) - end, - ok = rabbit_disk_queue:publish( - Q, Msg, false), - [] - end, - {RQueueAcc1, SizeAcc1} - end, {[], 0}, Msgs), + RQueueAcc) -> + if OnDisk -> + {MsgId, IsDelivered, AckTag, _PersistRemaining} = + rabbit_disk_queue:phantom_deliver(Q), + [ {AckTag, {next, IsDelivered}} | RQueueAcc ]; + true -> + ok = if [] == RQueueAcc -> ok; + true -> + rabbit_disk_queue:requeue_with_seqs( + Q, lists:reverse(RQueueAcc)) + end, + ok = rabbit_disk_queue:publish( + Q, Msg, false), + [] + end + end, [], Msgs), ok = if [] == Requeue -> ok; true -> rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue)) @@ -153,8 +150,7 @@ to_disk_only_mode(TxnMessages, State = _ -> rabbit_disk_queue:tx_publish(Msg) end end, TxnMessages), - {ok, - State #mqstate { mode = disk, msg_buf = queue:new(), memory_size = Size }}. + {ok, State #mqstate { mode = disk, msg_buf = queue:new() }}. to_mixed_mode(_TxnMessages, State = #mqstate { mode = mixed }) -> {ok, State}; @@ -184,7 +180,7 @@ to_mixed_mode(TxnMessages, State = end end, [], TxnMessages), ok = rabbit_disk_queue:tx_cancel(lists:reverse(Cancel)), - {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1, memory_size = 0 }}. + {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1 }}. purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable }) -> @@ -223,16 +219,17 @@ publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length, ok = rabbit_disk_queue:publish(Q, Msg, false), Size1 = Size + size_of_message(Msg), {ok, State #mqstate { length = Length + 1, memory_size = Size1 }}; -publish(Msg = #basic_message { is_persistent = IsPersistent }, - State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable, - msg_buf = MsgBuf, length = Length }) -> +publish(Msg = #basic_message { is_persistent = IsPersistent }, State = + #mqstate { queue = Q, mode = mixed, is_durable = IsDurable, + msg_buf = MsgBuf, length = Length, memory_size = Size }) -> OnDisk = IsDurable andalso IsPersistent, ok = if OnDisk -> rabbit_disk_queue:publish(Q, Msg, false); true -> ok end, + Size1 = Size + size_of_message(Msg), {ok, State #mqstate { msg_buf = queue:in({Msg, false, OnDisk}, MsgBuf), - length = Length + 1 }}. + length = Length + 1, memory_size = Size1 }}. %% Assumption here is that the queue is empty already (only called via %% attempt_immediate_delivery). @@ -264,15 +261,16 @@ deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable, {Msg = #basic_message { is_persistent = IsPersistent }, _Size, IsDelivered, AckTag, Remaining} = rabbit_disk_queue:deliver(Q), - Size = size_of_message(Msg), + QSize1 = QSize - size_of_message(Msg), AckTag1 = if IsPersistent andalso IsDurable -> AckTag; true -> ok = rabbit_disk_queue:ack(Q, [AckTag]), noack end, {{Msg, IsDelivered, AckTag1, Remaining}, - State #mqstate { length = Length - 1, memory_size = QSize - Size }}; -deliver(State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable, - msg_buf = MsgBuf, length = Length }) -> + State #mqstate { length = Length - 1, memory_size = QSize1 }}; +deliver(State = + #mqstate { mode = mixed, msg_buf = MsgBuf, is_durable = IsDurable, + queue = Q, length = Length, memory_size = QSize }) -> {{value, {Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, IsDelivered, OnDisk}}, MsgBuf1} @@ -290,8 +288,9 @@ deliver(State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable, true -> noack end, Rem = Length - 1, + QSize1 = QSize - size_of_message(Msg), {{Msg, IsDelivered, AckTag, Rem}, - State #mqstate { msg_buf = MsgBuf1, length = Rem }}. + State #mqstate { msg_buf = MsgBuf1, length = Rem, memory_size = QSize1 }}. remove_noacks(Acks) -> lists:filter(fun (A) -> A /= noack end, Acks). @@ -303,17 +302,18 @@ ack(Acks, State = #mqstate { queue = Q }) -> {ok, State} end. -tx_publish(Msg, State = #mqstate { mode = disk, memory_size = Size }) -> +tx_publish(Msg, State = #mqstate { mode = disk, memory_size = QSize }) -> ok = rabbit_disk_queue:tx_publish(Msg), - {ok, State #mqstate { memory_size = Size + size_of_message(Msg) }}; -tx_publish(Msg = #basic_message { is_persistent = IsPersistent }, - State = #mqstate { mode = mixed, is_durable = IsDurable }) + {ok, State #mqstate { memory_size = QSize + size_of_message(Msg) }}; +tx_publish(Msg = #basic_message { is_persistent = IsPersistent }, State = + #mqstate { mode = mixed, is_durable = IsDurable, + memory_size = QSize }) when IsDurable andalso IsPersistent -> ok = rabbit_disk_queue:tx_publish(Msg), - {ok, State}; -tx_publish(_Msg, State = #mqstate { mode = mixed }) -> + {ok, State #mqstate { memory_size = QSize + size_of_message(Msg) }}; +tx_publish(Msg, State = #mqstate { mode = mixed, memory_size = QSize }) -> %% this message will reappear in the tx_commit, so ignore for now - {ok, State}. + {ok, State #mqstate { memory_size = QSize + size_of_message(Msg) }}. only_msg_ids(Pubs) -> lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs). @@ -353,37 +353,38 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q, {ok, State #mqstate { msg_buf = MsgBuf1, length = Length + erlang:length(Publishes) }}. -only_persistent_msg_ids(Pubs) -> - lists:reverse( - lists:foldl( - fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) -> - if IsPersistent -> [Msg #basic_message.guid | Acc]; - true -> Acc - end - end, [], Pubs)). - -tx_cancel(Publishes, State = #mqstate { mode = disk, memory_size = TSize }) -> +tx_cancel(Publishes, State = #mqstate { mode = disk, memory_size = QSize }) -> {MsgIds, CSize} = lists:foldl( fun (Msg = #basic_message { guid = MsgId }, {MsgIdsAcc, CSizeAcc}) -> {[MsgId | MsgIdsAcc], CSizeAcc + size_of_message(Msg)} end, {[], 0}, Publishes), ok = rabbit_disk_queue:tx_cancel(lists:reverse(MsgIds)), - {ok, State #mqstate { memory_size = TSize - CSize }}; -tx_cancel(Publishes, - State = #mqstate { mode = mixed, is_durable = IsDurable }) -> + {ok, State #mqstate { memory_size = QSize - CSize }}; +tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable, + memory_size = QSize }) -> + {PersistentPubs, CSize} = + lists:foldl( + fun (Msg = #basic_message { is_persistent = IsPersistent, + guid = MsgId }, {Acc, CSizeAcc}) -> + CSizeAcc1 = CSizeAcc + size_of_message(Msg), + {case IsPersistent of + true -> [MsgId | Acc]; + _ -> Acc + end, CSizeAcc1} + end, {[], 0}, Publishes), ok = if IsDurable -> - rabbit_disk_queue:tx_cancel(only_persistent_msg_ids(Publishes)); + rabbit_disk_queue:tx_cancel(lists:reverse(PersistentPubs)); true -> ok end, - {ok, State}. + {ok, State #mqstate { memory_size = QSize - CSize }}. %% [{Msg, AckTag}] requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable, length = Length, - memory_size = TSize + memory_size = QSize }) -> %% here, we may have messages with no ack tags, because of the %% fact they are not persistent, but nevertheless we want to @@ -391,42 +392,44 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q, {Requeue, CSize} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, - AckTag}, {RQ, SizeAcc}) + AckTag}, {RQ, CSizeAcc}) when IsPersistent andalso IsDurable -> - {[AckTag | RQ], SizeAcc + size_of_message(Msg)}; - ({Msg, _AckTag}, {RQ, SizeAcc}) -> + {[AckTag | RQ], CSizeAcc + size_of_message(Msg)}; + ({Msg, _AckTag}, {RQ, CSizeAcc}) -> ok = if RQ == [] -> ok; true -> rabbit_disk_queue:requeue( Q, lists:reverse(RQ)) end, _AckTag1 = rabbit_disk_queue:publish( Q, Msg, true), - {[], SizeAcc + size_of_message(Msg)} + {[], CSizeAcc + size_of_message(Msg)} end, {[], 0}, MessagesWithAckTags), ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)), {ok, State #mqstate { length = Length + erlang:length(MessagesWithAckTags), - memory_size = TSize + CSize + memory_size = QSize + CSize }}; requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, is_durable = IsDurable, - length = Length + length = Length, + memory_size = QSize }) -> - {PersistentPubs, MsgBuf1} = + {PersistentPubs, MsgBuf1, CSize} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag}, - {Acc, MsgBuf2}) -> + {Acc, MsgBuf2, CSizeAcc}) -> OnDisk = IsDurable andalso IsPersistent, Acc1 = if OnDisk -> [AckTag | Acc]; true -> Acc end, - {Acc1, queue:in({Msg, true, OnDisk}, MsgBuf2)} - end, {[], MsgBuf}, MessagesWithAckTags), + CSizeAcc1 = CSizeAcc + size_of_message(Msg), + {Acc1, queue:in({Msg, true, OnDisk}, MsgBuf2), CSizeAcc1} + end, {[], MsgBuf, 0}, MessagesWithAckTags), ok = if [] == PersistentPubs -> ok; true -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs)) end, - {ok, State #mqstate {msg_buf = MsgBuf1, + {ok, State #mqstate {msg_buf = MsgBuf1, memory_size = QSize + CSize, length = Length + erlang:length(MessagesWithAckTags)}}. purge(State = #mqstate { queue = Q, mode = disk, length = Count }) -> @@ -434,14 +437,15 @@ purge(State = #mqstate { queue = Q, mode = disk, length = Count }) -> {Count, State #mqstate { length = 0, memory_size = 0 }}; purge(State = #mqstate { queue = Q, mode = mixed, length = Length }) -> rabbit_disk_queue:purge(Q), - {Length, State #mqstate { msg_buf = queue:new(), length = 0 }}. + {Length, + State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0 }}. delete_queue(State = #mqstate { queue = Q, mode = disk }) -> rabbit_disk_queue:delete_queue(Q), {ok, State #mqstate { length = 0, memory_size = 0 }}; delete_queue(State = #mqstate { queue = Q, mode = mixed }) -> rabbit_disk_queue:delete_queue(Q), - {ok, State #mqstate { msg_buf = queue:new(), length = 0 }}. + {ok, State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0 }}. length(#mqstate { length = Length }) -> Length. @@ -449,5 +453,5 @@ length(#mqstate { length = Length }) -> is_empty(#mqstate { length = Length }) -> 0 == Length. -estimate_extra_memory(#mqstate { memory_size = Size }) -> +estimate_queue_memory(#mqstate { memory_size = Size }) -> 2 * Size. %% Magic number. Will probably need playing with. |
