diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-09-01 15:47:19 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-09-01 15:47:19 +0100 |
| commit | d4848d68012418c78c39fdbfc4a5bf38588c7501 (patch) | |
| tree | af40a6c0d15727beefabb6ee90e6188c65426375 | |
| parent | 590800c2b2711c79cc87981afc67b7f6d903532d (diff) | |
| download | rabbitmq-server-git-d4848d68012418c78c39fdbfc4a5bf38588c7501.tar.gz | |
Before the magic_marker_msg was introduced, if the queue was entirely on disk already and was then told to go to disk_only mode, it would form a single requeue_next_n call, where the N would be the length of the queue. This would be detected by the disk_queue and become a no-op. Because of the introduction of the magic_marker_msg, that is no longer possible - we want the marker to go from the back of the queue to the front. As such, the N will at most be 1 less than the queue length, causing unnecessary work to be done.
This patch removes unnecessary work by observing that we do not need to rotate the entire queue should we find that the queue consists of zero or more disk-stored msgs followed by zero or more ram-only stored messages. If this is the case, we only need to publish the latter ram-only messages, and have no need for the magic marker msgs at all. Furthermore, if there are no ram-only messages we have no work to do at all. The only situation in which we must rotate the entire queue is when we have ram-only messages followed by disk messages. In this case, we have to get the ram-only messages onto the disk queue before the disk messages, which requires the full rotation.
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 102 |
1 files changed, 89 insertions, 13 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index bbec524b7f..2bb9c09a0e 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -362,20 +362,8 @@ set_storage_mode(disk, TxnMessages, State = queue:join(Fetched, MsgBuf2) end end, - %% (Re)enqueue _everything_ here. Note that due to batching going - %% on (see comments above send_messages_to_disk), if we crash - %% during this transition, we could have messages in the wrong - %% order on disk. Thus we publish a magic_marker_message which, - %% when this transition is compelete, will be back at the head of - %% the queue. Should we die, on startup, during the foldl over the - %% queue, we detect the marker message and requeue all the - %% messages in front of it, to the back of the queue, thus - %% correcting the order. The result is that everything ends up - %% back in the same order, but will have new sequence IDs. - ok = publish_magic_marker_message(Q), {ok, MsgBuf3} = - send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], [], queue:new()), - {ok, Length} = fetch_ack_magic_marker_message(Q), + send_messages_to_disk(IsDurable, Q, MsgBuf1, Length), %% tx_publish txn messages. Some of these will have been already %% published if they really are durable and persistent which is %% why we can't just use our own tx_publish/2 function (would end @@ -414,6 +402,90 @@ set_storage_mode(mixed, TxnMessages, State = garbage_collect(), {ok, State #mqstate { mode = mixed }}. +send_messages_to_disk(_IsDurable, _Q, MsgBuf, 0) -> + {ok, MsgBuf}; +send_messages_to_disk(IsDurable, Q, MsgBuf, Length) -> + case scan_for_disk_after_ram(IsDurable, MsgBuf) of + disk_only -> + %% Everything on disk already, we don't need to do + %% anything + {ok, inc_queue_length(queue:new(), Length)}; + {not_found, PrefixLen, MsgBufRAMSuffix} -> + %% No disk msgs follow RAM msgs and the queue has a RAM + %% suffix, so we can just publish those. If we crash at + %% this point, we may lose some messages, but everything + %% will remain in the right order, so no need for the + %% marker messages. + MsgBuf1 = inc_queue_length(queue:new(), PrefixLen), + send_messages_to_disk(IsDurable, Q, MsgBufRAMSuffix, 0, 0, [], [], + MsgBuf1); + found -> + %% There are disk msgs *after* ram msgs in the queue. We + %% need to reenqueue everything. Note that due to batching + %% going on (see comments above send_messages_to_disk/8), + %% if we crash during this transition, we could have + %% messages in the wrong order on disk. Thus we publish a + %% magic_marker_message which, when this transition is + %% complete, will be back at the head of the queue. Should + %% we die, on startup, during the foldl over the queue, we + %% detect the marker message and requeue all the messages + %% in front of it, to the back of the queue, thus + %% correcting the order. The result is that everything + %% ends up back in the same order, but will have new + %% sequence IDs. + ok = publish_magic_marker_message(Q), + {ok, MsgBuf1} = + send_messages_to_disk(IsDurable, Q, MsgBuf, 0, 0, [], [], + queue:new()), + {ok, Length} = fetch_ack_magic_marker_message(Q), + {ok, MsgBuf1} + end. + +scan_for_disk_after_ram(IsDurable, MsgBuf) -> + scan_for_disk_after_ram(IsDurable, MsgBuf, {disk, 0}). + +%% We return 'disk_only' if everything is alread on disk; 'found' if +%% we find a disk message after finding RAM messages; and +%% {'not_found', Count, MsgBuf} otherwise, where Count is the length +%% of the disk prefix, and MsgBuf is the RAM suffix of the MsgBuf +%% argument. Note msgs via the prefetcher are counted as RAM msgs on +%% the grounds that they have to be republished. +scan_for_disk_after_ram(IsDurable, MsgBuf, Mode) -> + case queue:out(MsgBuf) of + {empty, _MsgBuf} -> + case Mode of + {ram, N, MsgBuf1} -> {not_found, N, MsgBuf1}; + {disk, _N} -> disk_only + end; + {{value, {on_disk, Count}}, MsgBuf1} -> + case Mode of + {ram, _, _} -> found; %% found disk after RAM, bad + {disk, N} -> scan_for_disk_after_ram(IsDurable, MsgBuf1, + {disk, N + Count}) + end; + {{value, {_Msg, _IsDelivered, _AckTag}}, MsgBuf1} -> + %% found a msg from the prefetcher. Ensure RAM mode + scan_for_disk_after_ram(IsDurable, MsgBuf1, + ensure_ram(Mode, MsgBuf)); + {{value, + {#basic_message { is_persistent = IsPersistent }, _IsDelivered}}, + MsgBuf1} -> + %% normal message + case IsDurable andalso IsPersistent of + true -> + case Mode of + {ram, _, _} -> found; %% found disk after RAM, bad + {disk, N} -> scan_for_disk_after_ram(IsDurable, MsgBuf1, + {disk, N + 1}) + end; + false -> scan_for_disk_after_ram(IsDurable, MsgBuf1, + ensure_ram(Mode, MsgBuf)) + end + end. + +ensure_ram(Obj = {ram, _N, _MsgBuf}, _MsgBuf1) -> Obj; +ensure_ram({disk, N}, MsgBuf) -> {ram, N, MsgBuf}. + %% (Re)enqueue _everything_ here. Messages which are not on disk will %% be tx_published, messages that are on disk will be requeued to the %% end of the queue. This is done in batches, where a batch consists @@ -486,6 +558,10 @@ flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack) -> ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount), {[], []}. +%% Scaling this by 4 is a magic number. Found by trial and error to +%% work ok. We are deliberately over reporting so that we run out of +%% memory sooner rather than later, because the transition to disk +%% only modes transiently can take quite a lot of memory. estimate_queue_memory(State = #mqstate { memory_size = Size }) -> {State, 4 * Size}. |
