diff options
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 102 |
1 files changed, 89 insertions, 13 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index bbec524b7f..2bb9c09a0e 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -362,20 +362,8 @@ set_storage_mode(disk, TxnMessages, State = queue:join(Fetched, MsgBuf2) end end, - %% (Re)enqueue _everything_ here. Note that due to batching going - %% on (see comments above send_messages_to_disk), if we crash - %% during this transition, we could have messages in the wrong - %% order on disk. Thus we publish a magic_marker_message which, - %% when this transition is compelete, will be back at the head of - %% the queue. Should we die, on startup, during the foldl over the - %% queue, we detect the marker message and requeue all the - %% messages in front of it, to the back of the queue, thus - %% correcting the order. The result is that everything ends up - %% back in the same order, but will have new sequence IDs. - ok = publish_magic_marker_message(Q), {ok, MsgBuf3} = - send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], [], queue:new()), - {ok, Length} = fetch_ack_magic_marker_message(Q), + send_messages_to_disk(IsDurable, Q, MsgBuf1, Length), %% tx_publish txn messages. Some of these will have been already %% published if they really are durable and persistent which is %% why we can't just use our own tx_publish/2 function (would end @@ -414,6 +402,90 @@ set_storage_mode(mixed, TxnMessages, State = garbage_collect(), {ok, State #mqstate { mode = mixed }}. +send_messages_to_disk(_IsDurable, _Q, MsgBuf, 0) -> + {ok, MsgBuf}; +send_messages_to_disk(IsDurable, Q, MsgBuf, Length) -> + case scan_for_disk_after_ram(IsDurable, MsgBuf) of + disk_only -> + %% Everything on disk already, we don't need to do + %% anything + {ok, inc_queue_length(queue:new(), Length)}; + {not_found, PrefixLen, MsgBufRAMSuffix} -> + %% No disk msgs follow RAM msgs and the queue has a RAM + %% suffix, so we can just publish those. If we crash at + %% this point, we may lose some messages, but everything + %% will remain in the right order, so no need for the + %% marker messages. + MsgBuf1 = inc_queue_length(queue:new(), PrefixLen), + send_messages_to_disk(IsDurable, Q, MsgBufRAMSuffix, 0, 0, [], [], + MsgBuf1); + found -> + %% There are disk msgs *after* ram msgs in the queue. We + %% need to reenqueue everything. Note that due to batching + %% going on (see comments above send_messages_to_disk/8), + %% if we crash during this transition, we could have + %% messages in the wrong order on disk. Thus we publish a + %% magic_marker_message which, when this transition is + %% complete, will be back at the head of the queue. Should + %% we die, on startup, during the foldl over the queue, we + %% detect the marker message and requeue all the messages + %% in front of it, to the back of the queue, thus + %% correcting the order. The result is that everything + %% ends up back in the same order, but will have new + %% sequence IDs. + ok = publish_magic_marker_message(Q), + {ok, MsgBuf1} = + send_messages_to_disk(IsDurable, Q, MsgBuf, 0, 0, [], [], + queue:new()), + {ok, Length} = fetch_ack_magic_marker_message(Q), + {ok, MsgBuf1} + end. + +scan_for_disk_after_ram(IsDurable, MsgBuf) -> + scan_for_disk_after_ram(IsDurable, MsgBuf, {disk, 0}). + +%% We return 'disk_only' if everything is alread on disk; 'found' if +%% we find a disk message after finding RAM messages; and +%% {'not_found', Count, MsgBuf} otherwise, where Count is the length +%% of the disk prefix, and MsgBuf is the RAM suffix of the MsgBuf +%% argument. Note msgs via the prefetcher are counted as RAM msgs on +%% the grounds that they have to be republished. +scan_for_disk_after_ram(IsDurable, MsgBuf, Mode) -> + case queue:out(MsgBuf) of + {empty, _MsgBuf} -> + case Mode of + {ram, N, MsgBuf1} -> {not_found, N, MsgBuf1}; + {disk, _N} -> disk_only + end; + {{value, {on_disk, Count}}, MsgBuf1} -> + case Mode of + {ram, _, _} -> found; %% found disk after RAM, bad + {disk, N} -> scan_for_disk_after_ram(IsDurable, MsgBuf1, + {disk, N + Count}) + end; + {{value, {_Msg, _IsDelivered, _AckTag}}, MsgBuf1} -> + %% found a msg from the prefetcher. Ensure RAM mode + scan_for_disk_after_ram(IsDurable, MsgBuf1, + ensure_ram(Mode, MsgBuf)); + {{value, + {#basic_message { is_persistent = IsPersistent }, _IsDelivered}}, + MsgBuf1} -> + %% normal message + case IsDurable andalso IsPersistent of + true -> + case Mode of + {ram, _, _} -> found; %% found disk after RAM, bad + {disk, N} -> scan_for_disk_after_ram(IsDurable, MsgBuf1, + {disk, N + 1}) + end; + false -> scan_for_disk_after_ram(IsDurable, MsgBuf1, + ensure_ram(Mode, MsgBuf)) + end + end. + +ensure_ram(Obj = {ram, _N, _MsgBuf}, _MsgBuf1) -> Obj; +ensure_ram({disk, N}, MsgBuf) -> {ram, N, MsgBuf}. + %% (Re)enqueue _everything_ here. Messages which are not on disk will %% be tx_published, messages that are on disk will be requeued to the %% end of the queue. This is done in batches, where a batch consists @@ -486,6 +558,10 @@ flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack) -> ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount), {[], []}. +%% Scaling this by 4 is a magic number. Found by trial and error to +%% work ok. We are deliberately over reporting so that we run out of +%% memory sooner rather than later, because the transition to disk +%% only modes transiently can take quite a lot of memory. estimate_queue_memory(State = #mqstate { memory_size = Size }) -> {State, 4 * Size}. |
