summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-18 17:27:27 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-18 17:27:27 +0100
commit8b71dcf0fb2e41fe3dc6adc5f044ef0b9b65caee (patch)
tree72874558408b11be0e2729eacd81da2c1af410d0
parent3d4e44e5a7cca75ce09c1bfa86d5eccd271f37ce (diff)
downloadrabbitmq-server-git-8b71dcf0fb2e41fe3dc6adc5f044ef0b9b65caee.tar.gz
initial work making the mixed_queue keep track of some number vaguely related to memory use when it's in disk only mode which could be used to estimate how much more memory is needed to switch to mixed mode
-rw-r--r--src/rabbit_mixed_queue.erl133
1 files changed, 82 insertions, 51 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 1793b6359d..edbc51a63f 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -39,13 +39,14 @@
tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1,
length/1, is_empty/1, delete_queue/1]).
--export([to_disk_only_mode/1, to_mixed_mode/1]).
+-export([to_disk_only_mode/1, to_mixed_mode/1, estimate_extra_memory/1]).
-record(mqstate, { mode,
msg_buf,
queue,
is_durable,
- length
+ length,
+ memory_size
}
).
@@ -56,7 +57,8 @@
msg_buf :: queue(),
queue :: queue_name(),
is_durable :: bool(),
- length :: non_neg_integer()
+ length :: non_neg_integer(),
+ memory_size :: non_neg_integer()
}).
-type(acktag() :: ( 'noack' | { non_neg_integer(), non_neg_integer() })).
-type(okmqs() :: {'ok', mqstate()}).
@@ -79,13 +81,14 @@
-spec(length/1 :: (mqstate()) -> non_neg_integer()).
-spec(is_empty/1 :: (mqstate()) -> bool()).
+-spec(estimate_extra_memory/1 :: (mqstate()) -> non_neg_integer).
-endif.
init(Queue, IsDurable, disk) ->
purge_non_persistent_messages(
#mqstate { mode = disk, msg_buf = queue:new(), queue = Queue,
- is_durable = IsDurable, length = 0 });
+ is_durable = IsDurable, length = 0, memory_size = 0 });
init(Queue, IsDurable, mixed) ->
{ok, State} = init(Queue, IsDurable, disk),
to_mixed_mode(State).
@@ -102,30 +105,35 @@ to_disk_only_mode(State =
%% Note we also batch together messages on disk so that we minimise
%% the calls to requeue.
Msgs = queue:to_list(MsgBuf),
- Requeue =
+ {Requeue, Size} =
lists:foldl(
fun ({Msg = #basic_message { guid = MsgId }, IsDelivered, OnDisk},
- RQueueAcc) ->
- if OnDisk ->
- {MsgId, IsDelivered, AckTag, _PersistRemaining} =
- rabbit_disk_queue:phantom_deliver(Q),
- [ {AckTag, {next, IsDelivered}} | RQueueAcc ];
- true ->
- ok = if [] == RQueueAcc -> ok;
- true ->
- rabbit_disk_queue:requeue_with_seqs(
- Q, lists:reverse(RQueueAcc))
- end,
- ok = rabbit_disk_queue:publish(
- Q, MsgId, msg_to_bin(Msg), false),
- []
- end
- end, [], Msgs),
+ {RQueueAcc, SizeAcc}) ->
+ {MsgBin, MsgSize} = msg_to_bin(Msg),
+ SizeAcc1 = SizeAcc + MsgSize,
+ RQueueAcc1 =
+ if OnDisk ->
+ {MsgId, IsDelivered, AckTag, _PersistRemaining} =
+ rabbit_disk_queue:phantom_deliver(Q),
+ [ {AckTag, {next, IsDelivered}} | RQueueAcc ];
+ true ->
+ ok = if [] == RQueueAcc -> ok;
+ true ->
+ rabbit_disk_queue:requeue_with_seqs(
+ Q, lists:reverse(RQueueAcc))
+ end,
+ ok = rabbit_disk_queue:publish(
+ Q, MsgId, MsgBin, false),
+ []
+ end,
+ {RQueueAcc1, SizeAcc1}
+ end, {[], 0}, Msgs),
ok = if [] == Requeue -> ok;
true ->
rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue))
end,
- {ok, State #mqstate { mode = disk, msg_buf = queue:new() }}.
+ {ok,
+ State #mqstate { mode = disk, msg_buf = queue:new(), memory_size = Size }}.
to_mixed_mode(State = #mqstate { mode = mixed }) ->
{ok, State};
@@ -141,7 +149,7 @@ to_mixed_mode(State = #mqstate { mode = disk, queue = Q, length = Length }) ->
Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin),
{queue:in({Msg, IsDelivered, true}, Buf), L+1}
end, {queue:new(), 0}, QList),
- {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1 }}.
+ {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1, memory_size = 0 }}.
purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q,
is_durable = IsDurable }) ->
@@ -178,21 +186,25 @@ deliver_all_messages(Q, IsDurable, Acks, Requeue, Length) ->
msg_to_bin(Msg = #basic_message { content = Content }) ->
ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
- term_to_binary(Msg #basic_message { content = ClearedContent }).
+ Bin = term_to_binary(Msg #basic_message { content = ClearedContent }),
+ {Bin, size(Bin)}.
bin_to_msg(MsgBin) ->
binary_to_term(MsgBin).
publish(Msg = #basic_message { guid = MsgId },
- State = #mqstate { mode = disk, queue = Q, length = Length }) ->
- ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false),
- {ok, State #mqstate { length = Length + 1 }};
+ State = #mqstate { mode = disk, queue = Q, length = Length,
+ memory_size = Size}) ->
+ {MsgBin, MsgSize} = msg_to_bin(Msg),
+ ok = rabbit_disk_queue:publish(Q, MsgId, MsgBin, false),
+ {ok, State #mqstate { length = Length + 1, memory_size = Size + MsgSize }};
publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable,
msg_buf = MsgBuf, length = Length }) ->
OnDisk = IsDurable andalso IsPersistent,
+ {MsgBin, _MsgSize} = msg_to_bin(Msg),
ok = if OnDisk ->
- rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false);
+ rabbit_disk_queue:publish(Q, MsgId, MsgBin, false);
true -> ok
end,
{ok, State #mqstate { msg_buf = queue:in({Msg, false, OnDisk}, MsgBuf),
@@ -205,7 +217,8 @@ publish_delivered(Msg =
State = #mqstate { mode = Mode, is_durable = IsDurable,
queue = Q, length = 0 })
when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
- rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false),
+ {MsgBin, _MsgSize} = msg_to_bin(Msg),
+ rabbit_disk_queue:publish(Q, MsgId, MsgBin, false),
if IsDurable andalso IsPersistent ->
%% must call phantom_deliver otherwise the msg remains at
%% the head of the queue. This is synchronous, but
@@ -224,8 +237,8 @@ publish_delivered(_Msg, State = #mqstate { mode = mixed, length = 0 }) ->
deliver(State = #mqstate { length = 0 }) ->
{empty, State};
deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable,
- length = Length }) ->
- {MsgId, MsgBin, _Size, IsDelivered, AckTag, Remaining}
+ length = Length, memory_size = QSize }) ->
+ {MsgId, MsgBin, Size, IsDelivered, AckTag, Remaining}
= rabbit_disk_queue:deliver(Q),
#basic_message { guid = MsgId, is_persistent = IsPersistent } =
Msg = bin_to_msg(MsgBin),
@@ -234,8 +247,7 @@ deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable,
noack
end,
{{Msg, IsDelivered, AckTag1, Remaining},
- State #mqstate { length = Length - 1}};
-
+ State #mqstate { length = Length - 1, memory_size = QSize - Size }};
deliver(State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable,
msg_buf = MsgBuf, length = Length }) ->
{{value, {Msg = #basic_message { guid = MsgId,
@@ -269,13 +281,15 @@ ack(Acks, State = #mqstate { queue = Q }) ->
end.
tx_publish(Msg = #basic_message { guid = MsgId },
- State = #mqstate { mode = disk }) ->
- ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)),
- {ok, State};
+ State = #mqstate { mode = disk, memory_size = Size }) ->
+ {MsgBin, MsgSize} = msg_to_bin(Msg),
+ ok = rabbit_disk_queue:tx_publish(MsgId, MsgBin),
+ {ok, State #mqstate { memory_size = Size + MsgSize }};
tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
State = #mqstate { mode = mixed, is_durable = IsDurable })
when IsDurable andalso IsPersistent ->
- ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)),
+ {MsgBin, _MsgSize} = msg_to_bin(Msg),
+ ok = rabbit_disk_queue:tx_publish(MsgId, MsgBin),
{ok, State};
tx_publish(_Msg, State = #mqstate { mode = mixed }) ->
%% this message will reappear in the tx_commit, so ignore for now
@@ -328,9 +342,15 @@ only_persistent_msg_ids(Pubs) ->
end
end, [], Pubs)).
-tx_cancel(Publishes, State = #mqstate { mode = disk }) ->
- ok = rabbit_disk_queue:tx_cancel(only_msg_ids(Publishes)),
- {ok, State};
+tx_cancel(Publishes, State = #mqstate { mode = disk, memory_size = TSize }) ->
+ {MsgIds, CSize} =
+ lists:foldl(
+ fun (Msg = #basic_message { guid = MsgId }, {MsgIdsAcc, CSizeAcc}) ->
+ {_MsgBin, MsgSize} = msg_to_bin(Msg),
+ {[MsgId | MsgIdsAcc], CSizeAcc + MsgSize}
+ end, {[], 0}, Publishes),
+ ok = rabbit_disk_queue:tx_cancel(lists:reverse(MsgIds)),
+ {ok, State #mqstate { memory_size = TSize - CSize }};
tx_cancel(Publishes,
State = #mqstate { mode = mixed, is_durable = IsDurable }) ->
ok =
@@ -343,26 +363,34 @@ tx_cancel(Publishes,
%% [{Msg, AckTag}]
requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
is_durable = IsDurable,
- length = Length }) ->
+ length = Length,
+ memory_size = TSize
+ }) ->
%% here, we may have messages with no ack tags, because of the
%% fact they are not persistent, but nevertheless we want to
%% requeue them. This means publishing them delivered.
- Requeue
+ {Requeue, CSize}
= lists:foldl(
- fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ)
+ fun ({Msg = #basic_message { is_persistent = IsPersistent },
+ AckTag}, {RQ, SizeAcc})
when IsPersistent andalso IsDurable ->
- [AckTag | RQ];
- ({Msg = #basic_message { guid = MsgId }, _AckTag}, RQ) ->
+ {_MsgBin, MsgSize} = msg_to_bin(Msg),
+ {[AckTag | RQ], SizeAcc + MsgSize};
+ ({Msg = #basic_message { guid = MsgId }, _AckTag},
+ {RQ, SizeAcc}) ->
ok = if RQ == [] -> ok;
true -> rabbit_disk_queue:requeue(
Q, lists:reverse(RQ))
end,
+ {MsgBin, MsgSize} = msg_to_bin(Msg),
_AckTag1 = rabbit_disk_queue:publish(
- Q, MsgId, msg_to_bin(Msg), true),
- []
- end, [], MessagesWithAckTags),
+ Q, MsgId, MsgBin, true),
+ {[], SizeAcc + MsgSize}
+ end, {[], 0}, MessagesWithAckTags),
ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)),
- {ok, State #mqstate {length = Length + erlang:length(MessagesWithAckTags)}};
+ {ok, State #mqstate { length = Length + erlang:length(MessagesWithAckTags),
+ memory_size = TSize + CSize
+ }};
requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
msg_buf = MsgBuf,
is_durable = IsDurable,
@@ -387,14 +415,14 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
purge(State = #mqstate { queue = Q, mode = disk, length = Count }) ->
Count = rabbit_disk_queue:purge(Q),
- {Count, State #mqstate { length = 0 }};
+ {Count, State #mqstate { length = 0, memory_size = 0 }};
purge(State = #mqstate { queue = Q, mode = mixed, length = Length }) ->
rabbit_disk_queue:purge(Q),
{Length, State #mqstate { msg_buf = queue:new(), length = 0 }}.
delete_queue(State = #mqstate { queue = Q, mode = disk }) ->
rabbit_disk_queue:delete_queue(Q),
- {ok, State #mqstate { length = 0 }};
+ {ok, State #mqstate { length = 0, memory_size = 0 }};
delete_queue(State = #mqstate { queue = Q, mode = mixed }) ->
rabbit_disk_queue:delete_queue(Q),
{ok, State #mqstate { msg_buf = queue:new(), length = 0 }}.
@@ -404,3 +432,6 @@ length(#mqstate { length = Length }) ->
is_empty(#mqstate { length = Length }) ->
0 == Length.
+
+estimate_extra_memory(#mqstate { memory_size = Size }) ->
+ 2 * Size. %% Magic number. Will probably need playing with.