summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-27 12:07:56 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-27 12:07:56 +0100
commit9621889821c49e51e44f6298eeea10767f2825a5 (patch)
tree93907a8cf7e9565130d6226391f86eb19107574a /src
parent5fc13e26ff8a1a0b5dbacec2aa19326fbbd8cd1f (diff)
downloadrabbitmq-server-git-9621889821c49e51e44f6298eeea10767f2825a5.tar.gz
documentation
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mixed_queue.erl34
1 files changed, 26 insertions, 8 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index c6f71fa638..f0dcea1562 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -366,12 +366,16 @@ set_storage_mode(disk, TxnMessages, State =
queue:join(Fetched, MsgBuf2)
end
end,
- %% We enqueue _everything_ here. This means that should a message
- %% already be in the disk queue we must remove it and add it back
- %% in. Fortunately, by using requeue, we avoid rewriting the
- %% message on disk.
- %% Note we also batch together messages on disk so that we minimise
- %% the calls to requeue.
+ %% (Re)enqueue _everything_ here. Note that due to batching going
+ %% on (see comments above send_messages_to_disk), if we crash
+ %% during this transition, we could have messages in the wrong
+ %% order on disk. Thus we publish a magic_marker_message which,
+ %% when this transition is compelete, will be back at the head of
+ %% the queue. Should we die, on startup, during the foldl over the
+ %% queue, we detect the marker message and requeue all the
+ %% messages in front of it, to the back of the queue, thus
+ %% correcting the order. The result is that everything ends up
+ %% back in the same order, but will have new sequence IDs.
ok = publish_magic_marker_message(Q),
{ok, MsgBuf3} =
send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], [], queue:new()),
@@ -414,6 +418,16 @@ set_storage_mode(mixed, TxnMessages, State =
garbage_collect(),
{ok, State #mqstate { mode = mixed }}.
+%% (Re)enqueue _everything_ here. Messages which are not on disk will
+%% be tx_published, messages that are on disk will be requeued to the
+%% end of the queue. This is done in batches, where a batch consists
+%% of a number a tx_publishes, a tx_commit and then a call to
+%% requeue_next_n. We do not want to fetch messages off disk only to
+%% republish them later. Note in the tx_commit, we ack messages which
+%% are being _re_published. These are messages that have been fetched
+%% by the prefetcher.
+%% Batches are limited in size to make sure that the resultant mnesia
+%% transaction on tx_commit does not get too big, memory wise.
send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount,
Commit, Ack, MsgBuf) ->
case queue:out(Queue) of
@@ -434,8 +448,12 @@ send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount,
Ack, MsgBuf, Msg, IsDelivered)
end;
{{value, {Msg, IsDelivered, AckTag}}, Queue1} ->
- %% these have come via the prefetcher, so are no longer in
- %% the disk queue so they need to be republished
+ %% These have come via the prefetcher, so are no longer in
+ %% the disk queue (yes, they've not been ack'd yet, but
+ %% the head of the queue has passed these messages). We
+ %% need to requeue them, which we sneakily achieve by
+ %% tx_publishing them, and then in the tx_commit, ack the
+ %% old copy.
republish_message_to_disk_queue(
IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit,
[AckTag | Ack], MsgBuf, Msg, IsDelivered);