diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-20 15:01:36 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-20 15:01:36 +0100 |
| commit | 1f4f6d0654c6d852313bdb066319f75600b2621f (patch) | |
| tree | 3f06c8ad7c32dc890ee193e842cd48571541f949 | |
| parent | 06c83c2e075826a0391b627d8f09b3968a4200fc (diff) | |
| download | rabbitmq-server-git-1f4f6d0654c6d852313bdb066319f75600b2621f.tar.gz | |
Well, this was a very very sneaky bug.
1) create durable queue
2) send persistent msgs and send queue to disk_only mode (note, requires branch bug21444)
3) when done, set queue to mixed mode
4) send more persistent msgs
5) when done, wait for the prefetcher to do its thing
6) restart rabbit
7) observe that queue length is wrong
Bugs fixed:
o) in the to_disk_only_mode code in mixed_queue, msgs that had come out of the prefetcher weren't being acked. This meant that on a restart, the msgs would be recovered. Given that we have to requeue everything anyway (sometimes) in a mixed -> disk transition, we obviously have to ack these msgs before republishing them. Note that we do this as part of a tx_commit, so it's perfectly safe
o) in the to_disk_only_mode code in mixed_queue, there was a recursion which swapped an IsDurable param with an IsDelivered param. This caused substantial fail.
o) transaction commit coalescing is dangerous, especially when you're relying on calls to the disk queue to happen in order. For example, should you tx_publish, tx_commit and then auto_ack, or requeue_next_n, you would expect that those last calls get to see the msgs tx_published. This is not necessarily the case. A further good example is a tx_commit followed by a queue.delete. So, in the disk_queue for such calls, make sure that we flush properly, but also expose this functionality (it was already exposed, but as a cast, and although not absolutely necessary to be a call, if we're tx_commiting anyway then that's a call, so another full round trip isn't a problem).
One final note, there is no way that this bug would have been discovered and so easily replicated and debugged without the pinning code in bug 21444. We will seriously hamper our own ability to debug and aid clients should the new persister get released without 21444.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 56 |
2 files changed, 42 insertions, 36 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index e2f341ffd4..c1744d6695 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -354,7 +354,7 @@ to_ram_disk_mode() -> gen_server2:pcall(?SERVER, 9, to_ram_disk_mode, infinity). filesync() -> - gen_server2:pcast(?SERVER, 10, filesync). + gen_server2:pcall(?SERVER, 9, filesync). cache_info() -> gen_server2:call(?SERVER, cache_info, infinity). @@ -467,6 +467,8 @@ handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) -> handle_call({purge, Q}, _From, State) -> {ok, Count, State1} = internal_purge(Q, State), reply(Count, State1); +handle_call(filesync, _From, State) -> + reply(ok, sync_current_file_handle(State)); handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) -> {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), reply(WriteSeqId - ReadSeqId, State); @@ -521,8 +523,6 @@ handle_cast({requeue_next_n, Q, N}, State) -> handle_cast({delete_queue, Q}, State) -> {ok, State1} = internal_delete_queue(Q, State), noreply(State1); -handle_cast(filesync, State) -> - noreply(sync_current_file_handle(State)); handle_cast({set_mode, Mode}, State) -> noreply((case Mode of disk -> fun to_disk_only_mode/1; @@ -909,9 +909,11 @@ internal_fetch(Q, ReadMsg, FakeDeliver, Advance, end, State1} end. -internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) -> +internal_foldl(Q, Fun, Init, State) -> + State1 = #dqstate { sequences = Sequences } = + sync_current_file_handle(State), {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), - internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId). + internal_foldl(Q, WriteSeqId, Fun, State1, Init, ReadSeqId). internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) -> {ok, Acc, State}; @@ -1118,11 +1120,12 @@ internal_publish(Q, Message = #basic_message { guid = MsgId }, {ok, {MsgId, WriteSeqId}, State1}. internal_tx_cancel(MsgIds, State) -> + State1 = sync_current_file_handle(State), %% we don't need seq ids because we're not touching mnesia, %% because seqids were never assigned MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds), undefined)), - remove_messages(undefined, MsgSeqIds, false, State). + remove_messages(undefined, MsgSeqIds, false, State1). internal_requeue(_Q, [], State) -> {ok, State}; @@ -1218,8 +1221,9 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) -> end. internal_delete_queue(Q, State) -> - {ok, _Count, State1 = #dqstate { sequences = Sequences }} = - internal_purge(Q, State), %% remove everything undelivered + State1 = sync_current_file_handle(State), + {ok, _Count, State2 = #dqstate { sequences = Sequences }} = + internal_purge(Q, State1), %% remove everything undelivered true = ets:delete(Sequences, Q), %% now remove everything already delivered Objs = mnesia:dirty_match_object( @@ -1233,7 +1237,7 @@ internal_delete_queue(Q, State) -> fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, msg_id = MsgId }) -> {MsgId, SeqId} end, Objs), - remove_messages(Q, MsgSeqIds, true, State1). + remove_messages(Q, MsgSeqIds, true, State2). internal_delete_non_durable_queues( DurableQueues, State = #dqstate { sequences = Sequences }) -> diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 2e67735f1b..f865b19a9c 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -145,7 +145,7 @@ set_mode(disk, TxnMessages, State = %% Note we also batch together messages on disk so that we minimise %% the calls to requeue. {ok, MsgBuf3} = - send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], queue:new()), + send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], [], queue:new()), %% tx_publish txn messages. Some of these will have been already %% published if they really are durable and persistent which is %% why we can't just use our own tx_publish/2 function (would end @@ -187,11 +187,11 @@ set_mode(mixed, TxnMessages, State = #mqstate { mode = disk, queue = Q, {ok, State #mqstate { mode = mixed }}. send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, - Commit, MsgBuf) -> + Commit, Ack, MsgBuf) -> case queue:out(Queue) of {empty, _Queue} -> - ok = flush_messages_to_disk_queue(Q, Commit), - [] = flush_requeue_to_disk_queue(Q, RequeueCount, []), + ok = flush_messages_to_disk_queue(Q, Commit, Ack), + {[], []} = flush_requeue_to_disk_queue(Q, RequeueCount, [], []), {ok, MsgBuf}; {{value, {Msg = #basic_message { is_persistent = IsPersistent }, IsDelivered}}, Queue1} -> @@ -199,49 +199,51 @@ send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, true -> %% it's already in the Q send_messages_to_disk( IsDurable, Q, Queue1, PublishCount, RequeueCount + 1, - Commit, inc_queue_length(Q, MsgBuf, 1)); + Commit, Ack, inc_queue_length(Q, MsgBuf, 1)); false -> republish_message_to_disk_queue( IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit, - MsgBuf, Msg, IsDelivered) + Ack, MsgBuf, Msg, IsDelivered) end; - {{value, {Msg, IsDelivered, _AckTag}}, Queue1} -> + {{value, {Msg, IsDelivered, AckTag}}, Queue1} -> %% these have come via the prefetcher, so are no longer in %% the disk queue so they need to be republished - republish_message_to_disk_queue(IsDelivered, Q, Queue1, - PublishCount, RequeueCount, Commit, - MsgBuf, Msg, IsDelivered); + republish_message_to_disk_queue( + IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit, + [AckTag | Ack], MsgBuf, Msg, IsDelivered); {{value, {Q, Count}}, Queue1} -> send_messages_to_disk(IsDurable, Q, Queue1, PublishCount, - RequeueCount + Count, Commit, + RequeueCount + Count, Commit, Ack, inc_queue_length(Q, MsgBuf, Count)) end. republish_message_to_disk_queue(IsDurable, Q, Queue, PublishCount, RequeueCount, - Commit, MsgBuf, Msg = + Commit, Ack, MsgBuf, Msg = #basic_message { guid = MsgId }, IsDelivered) -> - Commit1 = flush_requeue_to_disk_queue(Q, RequeueCount, Commit), + {Commit1, Ack1} = flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack), ok = rabbit_disk_queue:tx_publish(Msg), - {PublishCount1, Commit2} = + {PublishCount1, Commit2, Ack2} = case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of - true -> ok = flush_messages_to_disk_queue(Q, Commit1), - {1, [{MsgId, IsDelivered}]}; - false -> {PublishCount + 1, [{MsgId, IsDelivered} | Commit1]} + true -> ok = flush_messages_to_disk_queue( + Q, [{MsgId, IsDelivered} | Commit1], Ack1), + {0, [], []}; + false -> {PublishCount + 1, [{MsgId, IsDelivered} | Commit1], Ack1} end, send_messages_to_disk(IsDurable, Q, Queue, PublishCount1, 0, - Commit2, inc_queue_length(Q, MsgBuf, 1)). + Commit2, Ack2, inc_queue_length(Q, MsgBuf, 1)). -flush_messages_to_disk_queue(_Q, []) -> +flush_messages_to_disk_queue(_Q, [], []) -> ok; -flush_messages_to_disk_queue(Q, Commit) -> - rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), []). - -flush_requeue_to_disk_queue(_Q, 0, Commit) -> - Commit; -flush_requeue_to_disk_queue(Q, RequeueCount, Commit) -> - ok = flush_messages_to_disk_queue(Q, Commit), +flush_messages_to_disk_queue(Q, Commit, Ack) -> + rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), Ack). + +flush_requeue_to_disk_queue(_Q, 0, Commit, Ack) -> + {Commit, Ack}; +flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack) -> + ok = flush_messages_to_disk_queue(Q, Commit, Ack), + ok = rabbit_disk_queue:filesync(), ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount), - []. + {[], []}. gain_memory(Inc, State = #mqstate { memory_size = QSize, memory_gain = Gain }) -> |
