summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mixed_queue.erl102
1 files changed, 89 insertions, 13 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index bbec524b7f..2bb9c09a0e 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -362,20 +362,8 @@ set_storage_mode(disk, TxnMessages, State =
queue:join(Fetched, MsgBuf2)
end
end,
- %% (Re)enqueue _everything_ here. Note that due to batching going
- %% on (see comments above send_messages_to_disk), if we crash
- %% during this transition, we could have messages in the wrong
- %% order on disk. Thus we publish a magic_marker_message which,
- %% when this transition is compelete, will be back at the head of
- %% the queue. Should we die, on startup, during the foldl over the
- %% queue, we detect the marker message and requeue all the
- %% messages in front of it, to the back of the queue, thus
- %% correcting the order. The result is that everything ends up
- %% back in the same order, but will have new sequence IDs.
- ok = publish_magic_marker_message(Q),
{ok, MsgBuf3} =
- send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], [], queue:new()),
- {ok, Length} = fetch_ack_magic_marker_message(Q),
+ send_messages_to_disk(IsDurable, Q, MsgBuf1, Length),
%% tx_publish txn messages. Some of these will have been already
%% published if they really are durable and persistent which is
%% why we can't just use our own tx_publish/2 function (would end
@@ -414,6 +402,90 @@ set_storage_mode(mixed, TxnMessages, State =
garbage_collect(),
{ok, State #mqstate { mode = mixed }}.
+send_messages_to_disk(_IsDurable, _Q, MsgBuf, 0) ->
+ {ok, MsgBuf};
+send_messages_to_disk(IsDurable, Q, MsgBuf, Length) ->
+ case scan_for_disk_after_ram(IsDurable, MsgBuf) of
+ disk_only ->
+ %% Everything on disk already, we don't need to do
+ %% anything
+ {ok, inc_queue_length(queue:new(), Length)};
+ {not_found, PrefixLen, MsgBufRAMSuffix} ->
+ %% No disk msgs follow RAM msgs and the queue has a RAM
+ %% suffix, so we can just publish those. If we crash at
+ %% this point, we may lose some messages, but everything
+ %% will remain in the right order, so no need for the
+ %% marker messages.
+ MsgBuf1 = inc_queue_length(queue:new(), PrefixLen),
+ send_messages_to_disk(IsDurable, Q, MsgBufRAMSuffix, 0, 0, [], [],
+ MsgBuf1);
+ found ->
+ %% There are disk msgs *after* ram msgs in the queue. We
+ %% need to reenqueue everything. Note that due to batching
+ %% going on (see comments above send_messages_to_disk/8),
+ %% if we crash during this transition, we could have
+ %% messages in the wrong order on disk. Thus we publish a
+ %% magic_marker_message which, when this transition is
+ %% complete, will be back at the head of the queue. Should
+ %% we die, on startup, during the foldl over the queue, we
+ %% detect the marker message and requeue all the messages
+ %% in front of it, to the back of the queue, thus
+ %% correcting the order. The result is that everything
+ %% ends up back in the same order, but will have new
+ %% sequence IDs.
+ ok = publish_magic_marker_message(Q),
+ {ok, MsgBuf1} =
+ send_messages_to_disk(IsDurable, Q, MsgBuf, 0, 0, [], [],
+ queue:new()),
+ {ok, Length} = fetch_ack_magic_marker_message(Q),
+ {ok, MsgBuf1}
+ end.
+
+scan_for_disk_after_ram(IsDurable, MsgBuf) ->
+ scan_for_disk_after_ram(IsDurable, MsgBuf, {disk, 0}).
+
+%% We return 'disk_only' if everything is alread on disk; 'found' if
+%% we find a disk message after finding RAM messages; and
+%% {'not_found', Count, MsgBuf} otherwise, where Count is the length
+%% of the disk prefix, and MsgBuf is the RAM suffix of the MsgBuf
+%% argument. Note msgs via the prefetcher are counted as RAM msgs on
+%% the grounds that they have to be republished.
+scan_for_disk_after_ram(IsDurable, MsgBuf, Mode) ->
+ case queue:out(MsgBuf) of
+ {empty, _MsgBuf} ->
+ case Mode of
+ {ram, N, MsgBuf1} -> {not_found, N, MsgBuf1};
+ {disk, _N} -> disk_only
+ end;
+ {{value, {on_disk, Count}}, MsgBuf1} ->
+ case Mode of
+ {ram, _, _} -> found; %% found disk after RAM, bad
+ {disk, N} -> scan_for_disk_after_ram(IsDurable, MsgBuf1,
+ {disk, N + Count})
+ end;
+ {{value, {_Msg, _IsDelivered, _AckTag}}, MsgBuf1} ->
+ %% found a msg from the prefetcher. Ensure RAM mode
+ scan_for_disk_after_ram(IsDurable, MsgBuf1,
+ ensure_ram(Mode, MsgBuf));
+ {{value,
+ {#basic_message { is_persistent = IsPersistent }, _IsDelivered}},
+ MsgBuf1} ->
+ %% normal message
+ case IsDurable andalso IsPersistent of
+ true ->
+ case Mode of
+ {ram, _, _} -> found; %% found disk after RAM, bad
+ {disk, N} -> scan_for_disk_after_ram(IsDurable, MsgBuf1,
+ {disk, N + 1})
+ end;
+ false -> scan_for_disk_after_ram(IsDurable, MsgBuf1,
+ ensure_ram(Mode, MsgBuf))
+ end
+ end.
+
+ensure_ram(Obj = {ram, _N, _MsgBuf}, _MsgBuf1) -> Obj;
+ensure_ram({disk, N}, MsgBuf) -> {ram, N, MsgBuf}.
+
%% (Re)enqueue _everything_ here. Messages which are not on disk will
%% be tx_published, messages that are on disk will be requeued to the
%% end of the queue. This is done in batches, where a batch consists
@@ -486,6 +558,10 @@ flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack) ->
ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount),
{[], []}.
+%% Scaling this by 4 is a magic number. Found by trial and error to
+%% work ok. We are deliberately over reporting so that we run out of
+%% memory sooner rather than later, because the transition to disk
+%% only modes transiently can take quite a lot of memory.
estimate_queue_memory(State = #mqstate { memory_size = Size }) ->
{State, 4 * Size}.