summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mixed_queue.erl133
1 files changed, 82 insertions, 51 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 1793b6359d..edbc51a63f 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -39,13 +39,14 @@
tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1,
length/1, is_empty/1, delete_queue/1]).
--export([to_disk_only_mode/1, to_mixed_mode/1]).
+-export([to_disk_only_mode/1, to_mixed_mode/1, estimate_extra_memory/1]).
-record(mqstate, { mode,
msg_buf,
queue,
is_durable,
- length
+ length,
+ memory_size
}
).
@@ -56,7 +57,8 @@
msg_buf :: queue(),
queue :: queue_name(),
is_durable :: bool(),
- length :: non_neg_integer()
+ length :: non_neg_integer(),
+ memory_size :: non_neg_integer()
}).
-type(acktag() :: ( 'noack' | { non_neg_integer(), non_neg_integer() })).
-type(okmqs() :: {'ok', mqstate()}).
@@ -79,13 +81,14 @@
-spec(length/1 :: (mqstate()) -> non_neg_integer()).
-spec(is_empty/1 :: (mqstate()) -> bool()).
+-spec(estimate_extra_memory/1 :: (mqstate()) -> non_neg_integer).
-endif.
init(Queue, IsDurable, disk) ->
purge_non_persistent_messages(
#mqstate { mode = disk, msg_buf = queue:new(), queue = Queue,
- is_durable = IsDurable, length = 0 });
+ is_durable = IsDurable, length = 0, memory_size = 0 });
init(Queue, IsDurable, mixed) ->
{ok, State} = init(Queue, IsDurable, disk),
to_mixed_mode(State).
@@ -102,30 +105,35 @@ to_disk_only_mode(State =
%% Note we also batch together messages on disk so that we minimise
%% the calls to requeue.
Msgs = queue:to_list(MsgBuf),
- Requeue =
+ {Requeue, Size} =
lists:foldl(
fun ({Msg = #basic_message { guid = MsgId }, IsDelivered, OnDisk},
- RQueueAcc) ->
- if OnDisk ->
- {MsgId, IsDelivered, AckTag, _PersistRemaining} =
- rabbit_disk_queue:phantom_deliver(Q),
- [ {AckTag, {next, IsDelivered}} | RQueueAcc ];
- true ->
- ok = if [] == RQueueAcc -> ok;
- true ->
- rabbit_disk_queue:requeue_with_seqs(
- Q, lists:reverse(RQueueAcc))
- end,
- ok = rabbit_disk_queue:publish(
- Q, MsgId, msg_to_bin(Msg), false),
- []
- end
- end, [], Msgs),
+ {RQueueAcc, SizeAcc}) ->
+ {MsgBin, MsgSize} = msg_to_bin(Msg),
+ SizeAcc1 = SizeAcc + MsgSize,
+ RQueueAcc1 =
+ if OnDisk ->
+ {MsgId, IsDelivered, AckTag, _PersistRemaining} =
+ rabbit_disk_queue:phantom_deliver(Q),
+ [ {AckTag, {next, IsDelivered}} | RQueueAcc ];
+ true ->
+ ok = if [] == RQueueAcc -> ok;
+ true ->
+ rabbit_disk_queue:requeue_with_seqs(
+ Q, lists:reverse(RQueueAcc))
+ end,
+ ok = rabbit_disk_queue:publish(
+ Q, MsgId, MsgBin, false),
+ []
+ end,
+ {RQueueAcc1, SizeAcc1}
+ end, {[], 0}, Msgs),
ok = if [] == Requeue -> ok;
true ->
rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue))
end,
- {ok, State #mqstate { mode = disk, msg_buf = queue:new() }}.
+ {ok,
+ State #mqstate { mode = disk, msg_buf = queue:new(), memory_size = Size }}.
to_mixed_mode(State = #mqstate { mode = mixed }) ->
{ok, State};
@@ -141,7 +149,7 @@ to_mixed_mode(State = #mqstate { mode = disk, queue = Q, length = Length }) ->
Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin),
{queue:in({Msg, IsDelivered, true}, Buf), L+1}
end, {queue:new(), 0}, QList),
- {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1 }}.
+ {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1, memory_size = 0 }}.
purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q,
is_durable = IsDurable }) ->
@@ -178,21 +186,25 @@ deliver_all_messages(Q, IsDurable, Acks, Requeue, Length) ->
msg_to_bin(Msg = #basic_message { content = Content }) ->
ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
- term_to_binary(Msg #basic_message { content = ClearedContent }).
+ Bin = term_to_binary(Msg #basic_message { content = ClearedContent }),
+ {Bin, size(Bin)}.
bin_to_msg(MsgBin) ->
binary_to_term(MsgBin).
publish(Msg = #basic_message { guid = MsgId },
- State = #mqstate { mode = disk, queue = Q, length = Length }) ->
- ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false),
- {ok, State #mqstate { length = Length + 1 }};
+ State = #mqstate { mode = disk, queue = Q, length = Length,
+ memory_size = Size}) ->
+ {MsgBin, MsgSize} = msg_to_bin(Msg),
+ ok = rabbit_disk_queue:publish(Q, MsgId, MsgBin, false),
+ {ok, State #mqstate { length = Length + 1, memory_size = Size + MsgSize }};
publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable,
msg_buf = MsgBuf, length = Length }) ->
OnDisk = IsDurable andalso IsPersistent,
+ {MsgBin, _MsgSize} = msg_to_bin(Msg),
ok = if OnDisk ->
- rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false);
+ rabbit_disk_queue:publish(Q, MsgId, MsgBin, false);
true -> ok
end,
{ok, State #mqstate { msg_buf = queue:in({Msg, false, OnDisk}, MsgBuf),
@@ -205,7 +217,8 @@ publish_delivered(Msg =
State = #mqstate { mode = Mode, is_durable = IsDurable,
queue = Q, length = 0 })
when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
- rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false),
+ {MsgBin, _MsgSize} = msg_to_bin(Msg),
+ rabbit_disk_queue:publish(Q, MsgId, MsgBin, false),
if IsDurable andalso IsPersistent ->
%% must call phantom_deliver otherwise the msg remains at
%% the head of the queue. This is synchronous, but
@@ -224,8 +237,8 @@ publish_delivered(_Msg, State = #mqstate { mode = mixed, length = 0 }) ->
deliver(State = #mqstate { length = 0 }) ->
{empty, State};
deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable,
- length = Length }) ->
- {MsgId, MsgBin, _Size, IsDelivered, AckTag, Remaining}
+ length = Length, memory_size = QSize }) ->
+ {MsgId, MsgBin, Size, IsDelivered, AckTag, Remaining}
= rabbit_disk_queue:deliver(Q),
#basic_message { guid = MsgId, is_persistent = IsPersistent } =
Msg = bin_to_msg(MsgBin),
@@ -234,8 +247,7 @@ deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable,
noack
end,
{{Msg, IsDelivered, AckTag1, Remaining},
- State #mqstate { length = Length - 1}};
-
+ State #mqstate { length = Length - 1, memory_size = QSize - Size }};
deliver(State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable,
msg_buf = MsgBuf, length = Length }) ->
{{value, {Msg = #basic_message { guid = MsgId,
@@ -269,13 +281,15 @@ ack(Acks, State = #mqstate { queue = Q }) ->
end.
tx_publish(Msg = #basic_message { guid = MsgId },
- State = #mqstate { mode = disk }) ->
- ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)),
- {ok, State};
+ State = #mqstate { mode = disk, memory_size = Size }) ->
+ {MsgBin, MsgSize} = msg_to_bin(Msg),
+ ok = rabbit_disk_queue:tx_publish(MsgId, MsgBin),
+ {ok, State #mqstate { memory_size = Size + MsgSize }};
tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
State = #mqstate { mode = mixed, is_durable = IsDurable })
when IsDurable andalso IsPersistent ->
- ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)),
+ {MsgBin, _MsgSize} = msg_to_bin(Msg),
+ ok = rabbit_disk_queue:tx_publish(MsgId, MsgBin),
{ok, State};
tx_publish(_Msg, State = #mqstate { mode = mixed }) ->
%% this message will reappear in the tx_commit, so ignore for now
@@ -328,9 +342,15 @@ only_persistent_msg_ids(Pubs) ->
end
end, [], Pubs)).
-tx_cancel(Publishes, State = #mqstate { mode = disk }) ->
- ok = rabbit_disk_queue:tx_cancel(only_msg_ids(Publishes)),
- {ok, State};
+tx_cancel(Publishes, State = #mqstate { mode = disk, memory_size = TSize }) ->
+ {MsgIds, CSize} =
+ lists:foldl(
+ fun (Msg = #basic_message { guid = MsgId }, {MsgIdsAcc, CSizeAcc}) ->
+ {_MsgBin, MsgSize} = msg_to_bin(Msg),
+ {[MsgId | MsgIdsAcc], CSizeAcc + MsgSize}
+ end, {[], 0}, Publishes),
+ ok = rabbit_disk_queue:tx_cancel(lists:reverse(MsgIds)),
+ {ok, State #mqstate { memory_size = TSize - CSize }};
tx_cancel(Publishes,
State = #mqstate { mode = mixed, is_durable = IsDurable }) ->
ok =
@@ -343,26 +363,34 @@ tx_cancel(Publishes,
%% [{Msg, AckTag}]
requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
is_durable = IsDurable,
- length = Length }) ->
+ length = Length,
+ memory_size = TSize
+ }) ->
%% here, we may have messages with no ack tags, because of the
%% fact they are not persistent, but nevertheless we want to
%% requeue them. This means publishing them delivered.
- Requeue
+ {Requeue, CSize}
= lists:foldl(
- fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ)
+ fun ({Msg = #basic_message { is_persistent = IsPersistent },
+ AckTag}, {RQ, SizeAcc})
when IsPersistent andalso IsDurable ->
- [AckTag | RQ];
- ({Msg = #basic_message { guid = MsgId }, _AckTag}, RQ) ->
+ {_MsgBin, MsgSize} = msg_to_bin(Msg),
+ {[AckTag | RQ], SizeAcc + MsgSize};
+ ({Msg = #basic_message { guid = MsgId }, _AckTag},
+ {RQ, SizeAcc}) ->
ok = if RQ == [] -> ok;
true -> rabbit_disk_queue:requeue(
Q, lists:reverse(RQ))
end,
+ {MsgBin, MsgSize} = msg_to_bin(Msg),
_AckTag1 = rabbit_disk_queue:publish(
- Q, MsgId, msg_to_bin(Msg), true),
- []
- end, [], MessagesWithAckTags),
+ Q, MsgId, MsgBin, true),
+ {[], SizeAcc + MsgSize}
+ end, {[], 0}, MessagesWithAckTags),
ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)),
- {ok, State #mqstate {length = Length + erlang:length(MessagesWithAckTags)}};
+ {ok, State #mqstate { length = Length + erlang:length(MessagesWithAckTags),
+ memory_size = TSize + CSize
+ }};
requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
msg_buf = MsgBuf,
is_durable = IsDurable,
@@ -387,14 +415,14 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
purge(State = #mqstate { queue = Q, mode = disk, length = Count }) ->
Count = rabbit_disk_queue:purge(Q),
- {Count, State #mqstate { length = 0 }};
+ {Count, State #mqstate { length = 0, memory_size = 0 }};
purge(State = #mqstate { queue = Q, mode = mixed, length = Length }) ->
rabbit_disk_queue:purge(Q),
{Length, State #mqstate { msg_buf = queue:new(), length = 0 }}.
delete_queue(State = #mqstate { queue = Q, mode = disk }) ->
rabbit_disk_queue:delete_queue(Q),
- {ok, State #mqstate { length = 0 }};
+ {ok, State #mqstate { length = 0, memory_size = 0 }};
delete_queue(State = #mqstate { queue = Q, mode = mixed }) ->
rabbit_disk_queue:delete_queue(Q),
{ok, State #mqstate { msg_buf = queue:new(), length = 0 }}.
@@ -404,3 +432,6 @@ length(#mqstate { length = Length }) ->
is_empty(#mqstate { length = Length }) ->
0 == Length.
+
+estimate_extra_memory(#mqstate { memory_size = Size }) ->
+ 2 * Size. %% Magic number. Will probably need playing with.