summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-20 15:01:36 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-20 15:01:36 +0100
commit1f4f6d0654c6d852313bdb066319f75600b2621f (patch)
tree3f06c8ad7c32dc890ee193e842cd48571541f949
parent06c83c2e075826a0391b627d8f09b3968a4200fc (diff)
downloadrabbitmq-server-git-1f4f6d0654c6d852313bdb066319f75600b2621f.tar.gz
Well, this was a very very sneaky bug.
1) create durable queue 2) send persistent msgs and send queue to disk_only mode (note, requires branch bug21444) 3) when done, set queue to mixed mode 4) send more persistent msgs 5) when done, wait for the prefetcher to do its thing 6) restart rabbit 7) observe that queue length is wrong Bugs fixed: o) in the to_disk_only_mode code in mixed_queue, msgs that had come out of the prefetcher weren't being acked. This meant that on a restart, the msgs would be recovered. Given that we have to requeue everything anyway (sometimes) in a mixed -> disk transition, we obviously have to ack these msgs before republishing them. Note that we do this as part of a tx_commit, so it's perfectly safe o) in the to_disk_only_mode code in mixed_queue, there was a recursion which swapped an IsDurable param with an IsDelivered param. This caused substantial fail. o) transaction commit coalescing is dangerous, especially when you're relying on calls to the disk queue to happen in order. For example, should you tx_publish, tx_commit and then auto_ack, or requeue_next_n, you would expect that those last calls get to see the msgs tx_published. This is not necessarily the case. A further good example is a tx_commit followed by a queue.delete. So, in the disk_queue for such calls, make sure that we flush properly, but also expose this functionality (it was already exposed, but as a cast, and although not absolutely necessary to be a call, if we're tx_commiting anyway then that's a call, so another full round trip isn't a problem). One final note, there is no way that this bug would have been discovered and so easily replicated and debugged without the pinning code in bug 21444. We will seriously hamper our own ability to debug and aid clients should the new persister get released without 21444.
-rw-r--r--src/rabbit_disk_queue.erl22
-rw-r--r--src/rabbit_mixed_queue.erl56
2 files changed, 42 insertions, 36 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index e2f341ffd4..c1744d6695 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -354,7 +354,7 @@ to_ram_disk_mode() ->
gen_server2:pcall(?SERVER, 9, to_ram_disk_mode, infinity).
filesync() ->
- gen_server2:pcast(?SERVER, 10, filesync).
+ gen_server2:pcall(?SERVER, 9, filesync).
cache_info() ->
gen_server2:call(?SERVER, cache_info, infinity).
@@ -467,6 +467,8 @@ handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) ->
handle_call({purge, Q}, _From, State) ->
{ok, Count, State1} = internal_purge(Q, State),
reply(Count, State1);
+handle_call(filesync, _From, State) ->
+ reply(ok, sync_current_file_handle(State));
handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) ->
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
reply(WriteSeqId - ReadSeqId, State);
@@ -521,8 +523,6 @@ handle_cast({requeue_next_n, Q, N}, State) ->
handle_cast({delete_queue, Q}, State) ->
{ok, State1} = internal_delete_queue(Q, State),
noreply(State1);
-handle_cast(filesync, State) ->
- noreply(sync_current_file_handle(State));
handle_cast({set_mode, Mode}, State) ->
noreply((case Mode of
disk -> fun to_disk_only_mode/1;
@@ -909,9 +909,11 @@ internal_fetch(Q, ReadMsg, FakeDeliver, Advance,
end, State1}
end.
-internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) ->
+internal_foldl(Q, Fun, Init, State) ->
+ State1 = #dqstate { sequences = Sequences } =
+ sync_current_file_handle(State),
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
- internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId).
+ internal_foldl(Q, WriteSeqId, Fun, State1, Init, ReadSeqId).
internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) ->
{ok, Acc, State};
@@ -1118,11 +1120,12 @@ internal_publish(Q, Message = #basic_message { guid = MsgId },
{ok, {MsgId, WriteSeqId}, State1}.
internal_tx_cancel(MsgIds, State) ->
+ State1 = sync_current_file_handle(State),
%% we don't need seq ids because we're not touching mnesia,
%% because seqids were never assigned
MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds),
undefined)),
- remove_messages(undefined, MsgSeqIds, false, State).
+ remove_messages(undefined, MsgSeqIds, false, State1).
internal_requeue(_Q, [], State) ->
{ok, State};
@@ -1218,8 +1221,9 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
end.
internal_delete_queue(Q, State) ->
- {ok, _Count, State1 = #dqstate { sequences = Sequences }} =
- internal_purge(Q, State), %% remove everything undelivered
+ State1 = sync_current_file_handle(State),
+ {ok, _Count, State2 = #dqstate { sequences = Sequences }} =
+ internal_purge(Q, State1), %% remove everything undelivered
true = ets:delete(Sequences, Q),
%% now remove everything already delivered
Objs = mnesia:dirty_match_object(
@@ -1233,7 +1237,7 @@ internal_delete_queue(Q, State) ->
fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId},
msg_id = MsgId }) ->
{MsgId, SeqId} end, Objs),
- remove_messages(Q, MsgSeqIds, true, State1).
+ remove_messages(Q, MsgSeqIds, true, State2).
internal_delete_non_durable_queues(
DurableQueues, State = #dqstate { sequences = Sequences }) ->
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 2e67735f1b..f865b19a9c 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -145,7 +145,7 @@ set_mode(disk, TxnMessages, State =
%% Note we also batch together messages on disk so that we minimise
%% the calls to requeue.
{ok, MsgBuf3} =
- send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], queue:new()),
+ send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], [], queue:new()),
%% tx_publish txn messages. Some of these will have been already
%% published if they really are durable and persistent which is
%% why we can't just use our own tx_publish/2 function (would end
@@ -187,11 +187,11 @@ set_mode(mixed, TxnMessages, State = #mqstate { mode = disk, queue = Q,
{ok, State #mqstate { mode = mixed }}.
send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount,
- Commit, MsgBuf) ->
+ Commit, Ack, MsgBuf) ->
case queue:out(Queue) of
{empty, _Queue} ->
- ok = flush_messages_to_disk_queue(Q, Commit),
- [] = flush_requeue_to_disk_queue(Q, RequeueCount, []),
+ ok = flush_messages_to_disk_queue(Q, Commit, Ack),
+ {[], []} = flush_requeue_to_disk_queue(Q, RequeueCount, [], []),
{ok, MsgBuf};
{{value, {Msg = #basic_message { is_persistent = IsPersistent },
IsDelivered}}, Queue1} ->
@@ -199,49 +199,51 @@ send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount,
true -> %% it's already in the Q
send_messages_to_disk(
IsDurable, Q, Queue1, PublishCount, RequeueCount + 1,
- Commit, inc_queue_length(Q, MsgBuf, 1));
+ Commit, Ack, inc_queue_length(Q, MsgBuf, 1));
false ->
republish_message_to_disk_queue(
IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit,
- MsgBuf, Msg, IsDelivered)
+ Ack, MsgBuf, Msg, IsDelivered)
end;
- {{value, {Msg, IsDelivered, _AckTag}}, Queue1} ->
+ {{value, {Msg, IsDelivered, AckTag}}, Queue1} ->
%% these have come via the prefetcher, so are no longer in
%% the disk queue so they need to be republished
- republish_message_to_disk_queue(IsDelivered, Q, Queue1,
- PublishCount, RequeueCount, Commit,
- MsgBuf, Msg, IsDelivered);
+ republish_message_to_disk_queue(
+ IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit,
+ [AckTag | Ack], MsgBuf, Msg, IsDelivered);
{{value, {Q, Count}}, Queue1} ->
send_messages_to_disk(IsDurable, Q, Queue1, PublishCount,
- RequeueCount + Count, Commit,
+ RequeueCount + Count, Commit, Ack,
inc_queue_length(Q, MsgBuf, Count))
end.
republish_message_to_disk_queue(IsDurable, Q, Queue, PublishCount, RequeueCount,
- Commit, MsgBuf, Msg =
+ Commit, Ack, MsgBuf, Msg =
#basic_message { guid = MsgId }, IsDelivered) ->
- Commit1 = flush_requeue_to_disk_queue(Q, RequeueCount, Commit),
+ {Commit1, Ack1} = flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack),
ok = rabbit_disk_queue:tx_publish(Msg),
- {PublishCount1, Commit2} =
+ {PublishCount1, Commit2, Ack2} =
case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of
- true -> ok = flush_messages_to_disk_queue(Q, Commit1),
- {1, [{MsgId, IsDelivered}]};
- false -> {PublishCount + 1, [{MsgId, IsDelivered} | Commit1]}
+ true -> ok = flush_messages_to_disk_queue(
+ Q, [{MsgId, IsDelivered} | Commit1], Ack1),
+ {0, [], []};
+ false -> {PublishCount + 1, [{MsgId, IsDelivered} | Commit1], Ack1}
end,
send_messages_to_disk(IsDurable, Q, Queue, PublishCount1, 0,
- Commit2, inc_queue_length(Q, MsgBuf, 1)).
+ Commit2, Ack2, inc_queue_length(Q, MsgBuf, 1)).
-flush_messages_to_disk_queue(_Q, []) ->
+flush_messages_to_disk_queue(_Q, [], []) ->
ok;
-flush_messages_to_disk_queue(Q, Commit) ->
- rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), []).
-
-flush_requeue_to_disk_queue(_Q, 0, Commit) ->
- Commit;
-flush_requeue_to_disk_queue(Q, RequeueCount, Commit) ->
- ok = flush_messages_to_disk_queue(Q, Commit),
+flush_messages_to_disk_queue(Q, Commit, Ack) ->
+ rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), Ack).
+
+flush_requeue_to_disk_queue(_Q, 0, Commit, Ack) ->
+ {Commit, Ack};
+flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack) ->
+ ok = flush_messages_to_disk_queue(Q, Commit, Ack),
+ ok = rabbit_disk_queue:filesync(),
ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount),
- [].
+ {[], []}.
gain_memory(Inc, State = #mqstate { memory_size = QSize,
memory_gain = Gain }) ->