summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl22
-rw-r--r--src/rabbit_mixed_queue.erl56
2 files changed, 42 insertions, 36 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index e2f341ffd4..c1744d6695 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -354,7 +354,7 @@ to_ram_disk_mode() ->
gen_server2:pcall(?SERVER, 9, to_ram_disk_mode, infinity).
filesync() ->
- gen_server2:pcast(?SERVER, 10, filesync).
+ gen_server2:pcall(?SERVER, 9, filesync).
cache_info() ->
gen_server2:call(?SERVER, cache_info, infinity).
@@ -467,6 +467,8 @@ handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) ->
handle_call({purge, Q}, _From, State) ->
{ok, Count, State1} = internal_purge(Q, State),
reply(Count, State1);
+handle_call(filesync, _From, State) ->
+ reply(ok, sync_current_file_handle(State));
handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) ->
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
reply(WriteSeqId - ReadSeqId, State);
@@ -521,8 +523,6 @@ handle_cast({requeue_next_n, Q, N}, State) ->
handle_cast({delete_queue, Q}, State) ->
{ok, State1} = internal_delete_queue(Q, State),
noreply(State1);
-handle_cast(filesync, State) ->
- noreply(sync_current_file_handle(State));
handle_cast({set_mode, Mode}, State) ->
noreply((case Mode of
disk -> fun to_disk_only_mode/1;
@@ -909,9 +909,11 @@ internal_fetch(Q, ReadMsg, FakeDeliver, Advance,
end, State1}
end.
-internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) ->
+internal_foldl(Q, Fun, Init, State) ->
+ State1 = #dqstate { sequences = Sequences } =
+ sync_current_file_handle(State),
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
- internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId).
+ internal_foldl(Q, WriteSeqId, Fun, State1, Init, ReadSeqId).
internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) ->
{ok, Acc, State};
@@ -1118,11 +1120,12 @@ internal_publish(Q, Message = #basic_message { guid = MsgId },
{ok, {MsgId, WriteSeqId}, State1}.
internal_tx_cancel(MsgIds, State) ->
+ State1 = sync_current_file_handle(State),
%% we don't need seq ids because we're not touching mnesia,
%% because seqids were never assigned
MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds),
undefined)),
- remove_messages(undefined, MsgSeqIds, false, State).
+ remove_messages(undefined, MsgSeqIds, false, State1).
internal_requeue(_Q, [], State) ->
{ok, State};
@@ -1218,8 +1221,9 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
end.
internal_delete_queue(Q, State) ->
- {ok, _Count, State1 = #dqstate { sequences = Sequences }} =
- internal_purge(Q, State), %% remove everything undelivered
+ State1 = sync_current_file_handle(State),
+ {ok, _Count, State2 = #dqstate { sequences = Sequences }} =
+ internal_purge(Q, State1), %% remove everything undelivered
true = ets:delete(Sequences, Q),
%% now remove everything already delivered
Objs = mnesia:dirty_match_object(
@@ -1233,7 +1237,7 @@ internal_delete_queue(Q, State) ->
fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId},
msg_id = MsgId }) ->
{MsgId, SeqId} end, Objs),
- remove_messages(Q, MsgSeqIds, true, State1).
+ remove_messages(Q, MsgSeqIds, true, State2).
internal_delete_non_durable_queues(
DurableQueues, State = #dqstate { sequences = Sequences }) ->
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 2e67735f1b..f865b19a9c 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -145,7 +145,7 @@ set_mode(disk, TxnMessages, State =
%% Note we also batch together messages on disk so that we minimise
%% the calls to requeue.
{ok, MsgBuf3} =
- send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], queue:new()),
+ send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], [], queue:new()),
%% tx_publish txn messages. Some of these will have been already
%% published if they really are durable and persistent which is
%% why we can't just use our own tx_publish/2 function (would end
@@ -187,11 +187,11 @@ set_mode(mixed, TxnMessages, State = #mqstate { mode = disk, queue = Q,
{ok, State #mqstate { mode = mixed }}.
send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount,
- Commit, MsgBuf) ->
+ Commit, Ack, MsgBuf) ->
case queue:out(Queue) of
{empty, _Queue} ->
- ok = flush_messages_to_disk_queue(Q, Commit),
- [] = flush_requeue_to_disk_queue(Q, RequeueCount, []),
+ ok = flush_messages_to_disk_queue(Q, Commit, Ack),
+ {[], []} = flush_requeue_to_disk_queue(Q, RequeueCount, [], []),
{ok, MsgBuf};
{{value, {Msg = #basic_message { is_persistent = IsPersistent },
IsDelivered}}, Queue1} ->
@@ -199,49 +199,51 @@ send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount,
true -> %% it's already in the Q
send_messages_to_disk(
IsDurable, Q, Queue1, PublishCount, RequeueCount + 1,
- Commit, inc_queue_length(Q, MsgBuf, 1));
+ Commit, Ack, inc_queue_length(Q, MsgBuf, 1));
false ->
republish_message_to_disk_queue(
IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit,
- MsgBuf, Msg, IsDelivered)
+ Ack, MsgBuf, Msg, IsDelivered)
end;
- {{value, {Msg, IsDelivered, _AckTag}}, Queue1} ->
+ {{value, {Msg, IsDelivered, AckTag}}, Queue1} ->
%% these have come via the prefetcher, so are no longer in
%% the disk queue so they need to be republished
- republish_message_to_disk_queue(IsDelivered, Q, Queue1,
- PublishCount, RequeueCount, Commit,
- MsgBuf, Msg, IsDelivered);
+ republish_message_to_disk_queue(
+ IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit,
+ [AckTag | Ack], MsgBuf, Msg, IsDelivered);
{{value, {Q, Count}}, Queue1} ->
send_messages_to_disk(IsDurable, Q, Queue1, PublishCount,
- RequeueCount + Count, Commit,
+ RequeueCount + Count, Commit, Ack,
inc_queue_length(Q, MsgBuf, Count))
end.
republish_message_to_disk_queue(IsDurable, Q, Queue, PublishCount, RequeueCount,
- Commit, MsgBuf, Msg =
+ Commit, Ack, MsgBuf, Msg =
#basic_message { guid = MsgId }, IsDelivered) ->
- Commit1 = flush_requeue_to_disk_queue(Q, RequeueCount, Commit),
+ {Commit1, Ack1} = flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack),
ok = rabbit_disk_queue:tx_publish(Msg),
- {PublishCount1, Commit2} =
+ {PublishCount1, Commit2, Ack2} =
case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of
- true -> ok = flush_messages_to_disk_queue(Q, Commit1),
- {1, [{MsgId, IsDelivered}]};
- false -> {PublishCount + 1, [{MsgId, IsDelivered} | Commit1]}
+ true -> ok = flush_messages_to_disk_queue(
+ Q, [{MsgId, IsDelivered} | Commit1], Ack1),
+ {0, [], []};
+ false -> {PublishCount + 1, [{MsgId, IsDelivered} | Commit1], Ack1}
end,
send_messages_to_disk(IsDurable, Q, Queue, PublishCount1, 0,
- Commit2, inc_queue_length(Q, MsgBuf, 1)).
+ Commit2, Ack2, inc_queue_length(Q, MsgBuf, 1)).
-flush_messages_to_disk_queue(_Q, []) ->
+flush_messages_to_disk_queue(_Q, [], []) ->
ok;
-flush_messages_to_disk_queue(Q, Commit) ->
- rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), []).
-
-flush_requeue_to_disk_queue(_Q, 0, Commit) ->
- Commit;
-flush_requeue_to_disk_queue(Q, RequeueCount, Commit) ->
- ok = flush_messages_to_disk_queue(Q, Commit),
+flush_messages_to_disk_queue(Q, Commit, Ack) ->
+ rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), Ack).
+
+flush_requeue_to_disk_queue(_Q, 0, Commit, Ack) ->
+ {Commit, Ack};
+flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack) ->
+ ok = flush_messages_to_disk_queue(Q, Commit, Ack),
+ ok = rabbit_disk_queue:filesync(),
ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount),
- [].
+ {[], []}.
gain_memory(Inc, State = #mqstate { memory_size = QSize,
memory_gain = Gain }) ->