diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 56 |
2 files changed, 42 insertions, 36 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index e2f341ffd4..c1744d6695 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -354,7 +354,7 @@ to_ram_disk_mode() -> gen_server2:pcall(?SERVER, 9, to_ram_disk_mode, infinity). filesync() -> - gen_server2:pcast(?SERVER, 10, filesync). + gen_server2:pcall(?SERVER, 9, filesync). cache_info() -> gen_server2:call(?SERVER, cache_info, infinity). @@ -467,6 +467,8 @@ handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) -> handle_call({purge, Q}, _From, State) -> {ok, Count, State1} = internal_purge(Q, State), reply(Count, State1); +handle_call(filesync, _From, State) -> + reply(ok, sync_current_file_handle(State)); handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) -> {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), reply(WriteSeqId - ReadSeqId, State); @@ -521,8 +523,6 @@ handle_cast({requeue_next_n, Q, N}, State) -> handle_cast({delete_queue, Q}, State) -> {ok, State1} = internal_delete_queue(Q, State), noreply(State1); -handle_cast(filesync, State) -> - noreply(sync_current_file_handle(State)); handle_cast({set_mode, Mode}, State) -> noreply((case Mode of disk -> fun to_disk_only_mode/1; @@ -909,9 +909,11 @@ internal_fetch(Q, ReadMsg, FakeDeliver, Advance, end, State1} end. -internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) -> +internal_foldl(Q, Fun, Init, State) -> + State1 = #dqstate { sequences = Sequences } = + sync_current_file_handle(State), {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), - internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId). + internal_foldl(Q, WriteSeqId, Fun, State1, Init, ReadSeqId). internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) -> {ok, Acc, State}; @@ -1118,11 +1120,12 @@ internal_publish(Q, Message = #basic_message { guid = MsgId }, {ok, {MsgId, WriteSeqId}, State1}. internal_tx_cancel(MsgIds, State) -> + State1 = sync_current_file_handle(State), %% we don't need seq ids because we're not touching mnesia, %% because seqids were never assigned MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds), undefined)), - remove_messages(undefined, MsgSeqIds, false, State). + remove_messages(undefined, MsgSeqIds, false, State1). internal_requeue(_Q, [], State) -> {ok, State}; @@ -1218,8 +1221,9 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) -> end. internal_delete_queue(Q, State) -> - {ok, _Count, State1 = #dqstate { sequences = Sequences }} = - internal_purge(Q, State), %% remove everything undelivered + State1 = sync_current_file_handle(State), + {ok, _Count, State2 = #dqstate { sequences = Sequences }} = + internal_purge(Q, State1), %% remove everything undelivered true = ets:delete(Sequences, Q), %% now remove everything already delivered Objs = mnesia:dirty_match_object( @@ -1233,7 +1237,7 @@ internal_delete_queue(Q, State) -> fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, msg_id = MsgId }) -> {MsgId, SeqId} end, Objs), - remove_messages(Q, MsgSeqIds, true, State1). + remove_messages(Q, MsgSeqIds, true, State2). internal_delete_non_durable_queues( DurableQueues, State = #dqstate { sequences = Sequences }) -> diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 2e67735f1b..f865b19a9c 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -145,7 +145,7 @@ set_mode(disk, TxnMessages, State = %% Note we also batch together messages on disk so that we minimise %% the calls to requeue. {ok, MsgBuf3} = - send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], queue:new()), + send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], [], queue:new()), %% tx_publish txn messages. Some of these will have been already %% published if they really are durable and persistent which is %% why we can't just use our own tx_publish/2 function (would end @@ -187,11 +187,11 @@ set_mode(mixed, TxnMessages, State = #mqstate { mode = disk, queue = Q, {ok, State #mqstate { mode = mixed }}. send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, - Commit, MsgBuf) -> + Commit, Ack, MsgBuf) -> case queue:out(Queue) of {empty, _Queue} -> - ok = flush_messages_to_disk_queue(Q, Commit), - [] = flush_requeue_to_disk_queue(Q, RequeueCount, []), + ok = flush_messages_to_disk_queue(Q, Commit, Ack), + {[], []} = flush_requeue_to_disk_queue(Q, RequeueCount, [], []), {ok, MsgBuf}; {{value, {Msg = #basic_message { is_persistent = IsPersistent }, IsDelivered}}, Queue1} -> @@ -199,49 +199,51 @@ send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, true -> %% it's already in the Q send_messages_to_disk( IsDurable, Q, Queue1, PublishCount, RequeueCount + 1, - Commit, inc_queue_length(Q, MsgBuf, 1)); + Commit, Ack, inc_queue_length(Q, MsgBuf, 1)); false -> republish_message_to_disk_queue( IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit, - MsgBuf, Msg, IsDelivered) + Ack, MsgBuf, Msg, IsDelivered) end; - {{value, {Msg, IsDelivered, _AckTag}}, Queue1} -> + {{value, {Msg, IsDelivered, AckTag}}, Queue1} -> %% these have come via the prefetcher, so are no longer in %% the disk queue so they need to be republished - republish_message_to_disk_queue(IsDelivered, Q, Queue1, - PublishCount, RequeueCount, Commit, - MsgBuf, Msg, IsDelivered); + republish_message_to_disk_queue( + IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit, + [AckTag | Ack], MsgBuf, Msg, IsDelivered); {{value, {Q, Count}}, Queue1} -> send_messages_to_disk(IsDurable, Q, Queue1, PublishCount, - RequeueCount + Count, Commit, + RequeueCount + Count, Commit, Ack, inc_queue_length(Q, MsgBuf, Count)) end. republish_message_to_disk_queue(IsDurable, Q, Queue, PublishCount, RequeueCount, - Commit, MsgBuf, Msg = + Commit, Ack, MsgBuf, Msg = #basic_message { guid = MsgId }, IsDelivered) -> - Commit1 = flush_requeue_to_disk_queue(Q, RequeueCount, Commit), + {Commit1, Ack1} = flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack), ok = rabbit_disk_queue:tx_publish(Msg), - {PublishCount1, Commit2} = + {PublishCount1, Commit2, Ack2} = case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of - true -> ok = flush_messages_to_disk_queue(Q, Commit1), - {1, [{MsgId, IsDelivered}]}; - false -> {PublishCount + 1, [{MsgId, IsDelivered} | Commit1]} + true -> ok = flush_messages_to_disk_queue( + Q, [{MsgId, IsDelivered} | Commit1], Ack1), + {0, [], []}; + false -> {PublishCount + 1, [{MsgId, IsDelivered} | Commit1], Ack1} end, send_messages_to_disk(IsDurable, Q, Queue, PublishCount1, 0, - Commit2, inc_queue_length(Q, MsgBuf, 1)). + Commit2, Ack2, inc_queue_length(Q, MsgBuf, 1)). -flush_messages_to_disk_queue(_Q, []) -> +flush_messages_to_disk_queue(_Q, [], []) -> ok; -flush_messages_to_disk_queue(Q, Commit) -> - rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), []). - -flush_requeue_to_disk_queue(_Q, 0, Commit) -> - Commit; -flush_requeue_to_disk_queue(Q, RequeueCount, Commit) -> - ok = flush_messages_to_disk_queue(Q, Commit), +flush_messages_to_disk_queue(Q, Commit, Ack) -> + rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), Ack). + +flush_requeue_to_disk_queue(_Q, 0, Commit, Ack) -> + {Commit, Ack}; +flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack) -> + ok = flush_messages_to_disk_queue(Q, Commit, Ack), + ok = rabbit_disk_queue:filesync(), ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount), - []. + {[], []}. gain_memory(Inc, State = #mqstate { memory_size = QSize, memory_gain = Gain }) -> |
