diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 133 |
1 files changed, 82 insertions, 51 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 1793b6359d..edbc51a63f 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -39,13 +39,14 @@ tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1, length/1, is_empty/1, delete_queue/1]). --export([to_disk_only_mode/1, to_mixed_mode/1]). +-export([to_disk_only_mode/1, to_mixed_mode/1, estimate_extra_memory/1]). -record(mqstate, { mode, msg_buf, queue, is_durable, - length + length, + memory_size } ). @@ -56,7 +57,8 @@ msg_buf :: queue(), queue :: queue_name(), is_durable :: bool(), - length :: non_neg_integer() + length :: non_neg_integer(), + memory_size :: non_neg_integer() }). -type(acktag() :: ( 'noack' | { non_neg_integer(), non_neg_integer() })). -type(okmqs() :: {'ok', mqstate()}). @@ -79,13 +81,14 @@ -spec(length/1 :: (mqstate()) -> non_neg_integer()). -spec(is_empty/1 :: (mqstate()) -> bool()). +-spec(estimate_extra_memory/1 :: (mqstate()) -> non_neg_integer). -endif. init(Queue, IsDurable, disk) -> purge_non_persistent_messages( #mqstate { mode = disk, msg_buf = queue:new(), queue = Queue, - is_durable = IsDurable, length = 0 }); + is_durable = IsDurable, length = 0, memory_size = 0 }); init(Queue, IsDurable, mixed) -> {ok, State} = init(Queue, IsDurable, disk), to_mixed_mode(State). @@ -102,30 +105,35 @@ to_disk_only_mode(State = %% Note we also batch together messages on disk so that we minimise %% the calls to requeue. Msgs = queue:to_list(MsgBuf), - Requeue = + {Requeue, Size} = lists:foldl( fun ({Msg = #basic_message { guid = MsgId }, IsDelivered, OnDisk}, - RQueueAcc) -> - if OnDisk -> - {MsgId, IsDelivered, AckTag, _PersistRemaining} = - rabbit_disk_queue:phantom_deliver(Q), - [ {AckTag, {next, IsDelivered}} | RQueueAcc ]; - true -> - ok = if [] == RQueueAcc -> ok; - true -> - rabbit_disk_queue:requeue_with_seqs( - Q, lists:reverse(RQueueAcc)) - end, - ok = rabbit_disk_queue:publish( - Q, MsgId, msg_to_bin(Msg), false), - [] - end - end, [], Msgs), + {RQueueAcc, SizeAcc}) -> + {MsgBin, MsgSize} = msg_to_bin(Msg), + SizeAcc1 = SizeAcc + MsgSize, + RQueueAcc1 = + if OnDisk -> + {MsgId, IsDelivered, AckTag, _PersistRemaining} = + rabbit_disk_queue:phantom_deliver(Q), + [ {AckTag, {next, IsDelivered}} | RQueueAcc ]; + true -> + ok = if [] == RQueueAcc -> ok; + true -> + rabbit_disk_queue:requeue_with_seqs( + Q, lists:reverse(RQueueAcc)) + end, + ok = rabbit_disk_queue:publish( + Q, MsgId, MsgBin, false), + [] + end, + {RQueueAcc1, SizeAcc1} + end, {[], 0}, Msgs), ok = if [] == Requeue -> ok; true -> rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue)) end, - {ok, State #mqstate { mode = disk, msg_buf = queue:new() }}. + {ok, + State #mqstate { mode = disk, msg_buf = queue:new(), memory_size = Size }}. to_mixed_mode(State = #mqstate { mode = mixed }) -> {ok, State}; @@ -141,7 +149,7 @@ to_mixed_mode(State = #mqstate { mode = disk, queue = Q, length = Length }) -> Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin), {queue:in({Msg, IsDelivered, true}, Buf), L+1} end, {queue:new(), 0}, QList), - {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1 }}. + {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1, memory_size = 0 }}. purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable }) -> @@ -178,21 +186,25 @@ deliver_all_messages(Q, IsDurable, Acks, Requeue, Length) -> msg_to_bin(Msg = #basic_message { content = Content }) -> ClearedContent = rabbit_binary_parser:clear_decoded_content(Content), - term_to_binary(Msg #basic_message { content = ClearedContent }). + Bin = term_to_binary(Msg #basic_message { content = ClearedContent }), + {Bin, size(Bin)}. bin_to_msg(MsgBin) -> binary_to_term(MsgBin). publish(Msg = #basic_message { guid = MsgId }, - State = #mqstate { mode = disk, queue = Q, length = Length }) -> - ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false), - {ok, State #mqstate { length = Length + 1 }}; + State = #mqstate { mode = disk, queue = Q, length = Length, + memory_size = Size}) -> + {MsgBin, MsgSize} = msg_to_bin(Msg), + ok = rabbit_disk_queue:publish(Q, MsgId, MsgBin, false), + {ok, State #mqstate { length = Length + 1, memory_size = Size + MsgSize }}; publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable, msg_buf = MsgBuf, length = Length }) -> OnDisk = IsDurable andalso IsPersistent, + {MsgBin, _MsgSize} = msg_to_bin(Msg), ok = if OnDisk -> - rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false); + rabbit_disk_queue:publish(Q, MsgId, MsgBin, false); true -> ok end, {ok, State #mqstate { msg_buf = queue:in({Msg, false, OnDisk}, MsgBuf), @@ -205,7 +217,8 @@ publish_delivered(Msg = State = #mqstate { mode = Mode, is_durable = IsDurable, queue = Q, length = 0 }) when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> - rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false), + {MsgBin, _MsgSize} = msg_to_bin(Msg), + rabbit_disk_queue:publish(Q, MsgId, MsgBin, false), if IsDurable andalso IsPersistent -> %% must call phantom_deliver otherwise the msg remains at %% the head of the queue. This is synchronous, but @@ -224,8 +237,8 @@ publish_delivered(_Msg, State = #mqstate { mode = mixed, length = 0 }) -> deliver(State = #mqstate { length = 0 }) -> {empty, State}; deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable, - length = Length }) -> - {MsgId, MsgBin, _Size, IsDelivered, AckTag, Remaining} + length = Length, memory_size = QSize }) -> + {MsgId, MsgBin, Size, IsDelivered, AckTag, Remaining} = rabbit_disk_queue:deliver(Q), #basic_message { guid = MsgId, is_persistent = IsPersistent } = Msg = bin_to_msg(MsgBin), @@ -234,8 +247,7 @@ deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable, noack end, {{Msg, IsDelivered, AckTag1, Remaining}, - State #mqstate { length = Length - 1}}; - + State #mqstate { length = Length - 1, memory_size = QSize - Size }}; deliver(State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable, msg_buf = MsgBuf, length = Length }) -> {{value, {Msg = #basic_message { guid = MsgId, @@ -269,13 +281,15 @@ ack(Acks, State = #mqstate { queue = Q }) -> end. tx_publish(Msg = #basic_message { guid = MsgId }, - State = #mqstate { mode = disk }) -> - ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)), - {ok, State}; + State = #mqstate { mode = disk, memory_size = Size }) -> + {MsgBin, MsgSize} = msg_to_bin(Msg), + ok = rabbit_disk_queue:tx_publish(MsgId, MsgBin), + {ok, State #mqstate { memory_size = Size + MsgSize }}; tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, State = #mqstate { mode = mixed, is_durable = IsDurable }) when IsDurable andalso IsPersistent -> - ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)), + {MsgBin, _MsgSize} = msg_to_bin(Msg), + ok = rabbit_disk_queue:tx_publish(MsgId, MsgBin), {ok, State}; tx_publish(_Msg, State = #mqstate { mode = mixed }) -> %% this message will reappear in the tx_commit, so ignore for now @@ -328,9 +342,15 @@ only_persistent_msg_ids(Pubs) -> end end, [], Pubs)). -tx_cancel(Publishes, State = #mqstate { mode = disk }) -> - ok = rabbit_disk_queue:tx_cancel(only_msg_ids(Publishes)), - {ok, State}; +tx_cancel(Publishes, State = #mqstate { mode = disk, memory_size = TSize }) -> + {MsgIds, CSize} = + lists:foldl( + fun (Msg = #basic_message { guid = MsgId }, {MsgIdsAcc, CSizeAcc}) -> + {_MsgBin, MsgSize} = msg_to_bin(Msg), + {[MsgId | MsgIdsAcc], CSizeAcc + MsgSize} + end, {[], 0}, Publishes), + ok = rabbit_disk_queue:tx_cancel(lists:reverse(MsgIds)), + {ok, State #mqstate { memory_size = TSize - CSize }}; tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable }) -> ok = @@ -343,26 +363,34 @@ tx_cancel(Publishes, %% [{Msg, AckTag}] requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable, - length = Length }) -> + length = Length, + memory_size = TSize + }) -> %% here, we may have messages with no ack tags, because of the %% fact they are not persistent, but nevertheless we want to %% requeue them. This means publishing them delivered. - Requeue + {Requeue, CSize} = lists:foldl( - fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ) + fun ({Msg = #basic_message { is_persistent = IsPersistent }, + AckTag}, {RQ, SizeAcc}) when IsPersistent andalso IsDurable -> - [AckTag | RQ]; - ({Msg = #basic_message { guid = MsgId }, _AckTag}, RQ) -> + {_MsgBin, MsgSize} = msg_to_bin(Msg), + {[AckTag | RQ], SizeAcc + MsgSize}; + ({Msg = #basic_message { guid = MsgId }, _AckTag}, + {RQ, SizeAcc}) -> ok = if RQ == [] -> ok; true -> rabbit_disk_queue:requeue( Q, lists:reverse(RQ)) end, + {MsgBin, MsgSize} = msg_to_bin(Msg), _AckTag1 = rabbit_disk_queue:publish( - Q, MsgId, msg_to_bin(Msg), true), - [] - end, [], MessagesWithAckTags), + Q, MsgId, MsgBin, true), + {[], SizeAcc + MsgSize} + end, {[], 0}, MessagesWithAckTags), ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)), - {ok, State #mqstate {length = Length + erlang:length(MessagesWithAckTags)}}; + {ok, State #mqstate { length = Length + erlang:length(MessagesWithAckTags), + memory_size = TSize + CSize + }}; requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, is_durable = IsDurable, @@ -387,14 +415,14 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, purge(State = #mqstate { queue = Q, mode = disk, length = Count }) -> Count = rabbit_disk_queue:purge(Q), - {Count, State #mqstate { length = 0 }}; + {Count, State #mqstate { length = 0, memory_size = 0 }}; purge(State = #mqstate { queue = Q, mode = mixed, length = Length }) -> rabbit_disk_queue:purge(Q), {Length, State #mqstate { msg_buf = queue:new(), length = 0 }}. delete_queue(State = #mqstate { queue = Q, mode = disk }) -> rabbit_disk_queue:delete_queue(Q), - {ok, State #mqstate { length = 0 }}; + {ok, State #mqstate { length = 0, memory_size = 0 }}; delete_queue(State = #mqstate { queue = Q, mode = mixed }) -> rabbit_disk_queue:delete_queue(Q), {ok, State #mqstate { msg_buf = queue:new(), length = 0 }}. @@ -404,3 +432,6 @@ length(#mqstate { length = Length }) -> is_empty(#mqstate { length = Length }) -> 0 == Length. + +estimate_extra_memory(#mqstate { memory_size = Size }) -> + 2 * Size. %% Magic number. Will probably need playing with. |
