diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-27 12:07:56 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-27 12:07:56 +0100 |
| commit | 9621889821c49e51e44f6298eeea10767f2825a5 (patch) | |
| tree | 93907a8cf7e9565130d6226391f86eb19107574a /src | |
| parent | 5fc13e26ff8a1a0b5dbacec2aa19326fbbd8cd1f (diff) | |
| download | rabbitmq-server-git-9621889821c49e51e44f6298eeea10767f2825a5.tar.gz | |
documentation
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 34 |
1 files changed, 26 insertions, 8 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index c6f71fa638..f0dcea1562 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -366,12 +366,16 @@ set_storage_mode(disk, TxnMessages, State = queue:join(Fetched, MsgBuf2) end end, - %% We enqueue _everything_ here. This means that should a message - %% already be in the disk queue we must remove it and add it back - %% in. Fortunately, by using requeue, we avoid rewriting the - %% message on disk. - %% Note we also batch together messages on disk so that we minimise - %% the calls to requeue. + %% (Re)enqueue _everything_ here. Note that due to batching going + %% on (see comments above send_messages_to_disk), if we crash + %% during this transition, we could have messages in the wrong + %% order on disk. Thus we publish a magic_marker_message which, + %% when this transition is compelete, will be back at the head of + %% the queue. Should we die, on startup, during the foldl over the + %% queue, we detect the marker message and requeue all the + %% messages in front of it, to the back of the queue, thus + %% correcting the order. The result is that everything ends up + %% back in the same order, but will have new sequence IDs. ok = publish_magic_marker_message(Q), {ok, MsgBuf3} = send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], [], queue:new()), @@ -414,6 +418,16 @@ set_storage_mode(mixed, TxnMessages, State = garbage_collect(), {ok, State #mqstate { mode = mixed }}. +%% (Re)enqueue _everything_ here. Messages which are not on disk will +%% be tx_published, messages that are on disk will be requeued to the +%% end of the queue. This is done in batches, where a batch consists +%% of a number a tx_publishes, a tx_commit and then a call to +%% requeue_next_n. We do not want to fetch messages off disk only to +%% republish them later. Note in the tx_commit, we ack messages which +%% are being _re_published. These are messages that have been fetched +%% by the prefetcher. +%% Batches are limited in size to make sure that the resultant mnesia +%% transaction on tx_commit does not get too big, memory wise. send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, Commit, Ack, MsgBuf) -> case queue:out(Queue) of @@ -434,8 +448,12 @@ send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, Ack, MsgBuf, Msg, IsDelivered) end; {{value, {Msg, IsDelivered, AckTag}}, Queue1} -> - %% these have come via the prefetcher, so are no longer in - %% the disk queue so they need to be republished + %% These have come via the prefetcher, so are no longer in + %% the disk queue (yes, they've not been ack'd yet, but + %% the head of the queue has passed these messages). We + %% need to requeue them, which we sneakily achieve by + %% tx_publishing them, and then in the tx_commit, ack the + %% old copy. republish_message_to_disk_queue( IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit, [AckTag | Ack], MsgBuf, Msg, IsDelivered); |
