summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl36
-rw-r--r--src/rabbit_mixed_queue.erl43
-rw-r--r--src/rabbit_tests.erl34
3 files changed, 29 insertions, 84 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 990e5917d7..dc328792a0 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -41,7 +41,7 @@
-export([publish/3, deliver/1, phantom_deliver/1, ack/2,
tx_publish/1, tx_commit/3, tx_cancel/1,
requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1,
- dump_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1
+ delete_non_durable_queues/1, auto_ack_next_message/1
]).
-export([length/1, filesync/0, cache_info/0]).
@@ -259,7 +259,6 @@
(queue_name(),
[{{msg_id(), seq_id()}, {seq_id_or_next(), bool()}}]) -> 'ok').
-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
--spec(dump_queue/1 :: (queue_name()) -> [{msg_id(), bool()}]).
-spec(delete_non_durable_queues/1 :: (set()) -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
@@ -315,9 +314,6 @@ purge(Q) ->
delete_queue(Q) ->
gen_server2:cast(?SERVER, {delete_queue, Q}).
-dump_queue(Q) ->
- gen_server2:call(?SERVER, {dump_queue, Q}, infinity).
-
delete_non_durable_queues(DurableQueues) ->
gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues},
infinity).
@@ -463,9 +459,6 @@ handle_call(to_ram_disk_mode, _From, State) ->
handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) ->
{_ReadSeqId, _WriteSeqId, Length} = sequence_lookup(Sequences, Q),
reply(Length, State);
-handle_call({dump_queue, Q}, _From, State) ->
- {Result, State1} = internal_dump_queue(Q, State),
- reply(Result, State1);
handle_call({delete_non_durable_queues, DurableQueues}, _From, State) ->
{ok, State1} = internal_delete_non_durable_queues(DurableQueues, State),
reply(ok, State1);
@@ -1157,33 +1150,6 @@ internal_delete_queue(Q, State) ->
end),
{ok, State2}.
-internal_dump_queue(Q, State = #dqstate { sequences = Sequences }) ->
- case ets:lookup(Sequences, Q) of
- [] -> {[], State};
- [{Q, ReadSeq, WriteSeq, _Length}] ->
- Objs =
- mnesia:dirty_match_object(
- rabbit_disk_queue,
- #dq_msg_loc { queue_and_seq_id = {Q, '_'},
- msg_id = '_',
- is_delivered = '_',
- next_seq_id = '_'
- }),
- {Msgs, WriteSeq} =
- lists:foldl(
- fun (#dq_msg_loc { queue_and_seq_id = {_, Seq},
- msg_id = MsgId,
- is_delivered = Delivered,
- next_seq_id = NextSeq },
- {Acc, Seq}) ->
- {[{MsgId, Delivered} | Acc], NextSeq};
- (#dq_msg_loc { queue_and_seq_id = {_, Seq} },
- {[], RSeq}) when Seq < RSeq ->
- {[], RSeq}
- end, {[], ReadSeq}, lists:keysort(2, Objs)),
- {lists:reverse(Msgs), State}
- end.
-
internal_delete_non_durable_queues(
DurableQueues, State = #dqstate { sequences = Sequences }) ->
ets:foldl(
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 23696f27fc..f6ef355d9b 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -144,14 +144,20 @@ to_disk_only_mode(TxnMessages, State =
ok = rabbit_disk_queue:tx_publish(Msg),
{[], [ MsgId | TxPublishAcc ]}
end;
- ({MsgId, IsDelivered}, {RQueueAcc, TxPublishAcc}) ->
+ ({disk, Count}, {RQueueAcc, TxPublishAcc}) ->
ok = if [] == TxPublishAcc -> ok;
true -> rabbit_disk_queue:tx_commit(Q, TxPublishAcc,
[])
end,
- {MsgId, IsDelivered, AckTag, _PersistRemaining} =
- rabbit_disk_queue:phantom_deliver(Q),
- {[ {AckTag, {next, IsDelivered}} | RQueueAcc ], []}
+ {RQueueAcc1, 0} =
+ rabbit_misc:unfold(
+ fun (0) -> false;
+ (N) ->
+ {_MsgId, IsDelivered, AckTag, _PersistRemaining}
+ = rabbit_disk_queue:phantom_deliver(Q),
+ {true, {AckTag, {next, IsDelivered}}, N - 1}
+ end, Count),
+ {RQueueAcc1 ++ RQueueAcc, []}
end, {[], []}, Msgs),
ok = if [] == TxPublish -> ok;
true -> rabbit_disk_queue:tx_commit(Q, TxPublish, [])
@@ -179,11 +185,13 @@ to_mixed_mode(TxnMessages, State =
#mqstate { mode = disk, queue = Q, length = Length,
is_durable = IsDurable }) ->
rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]),
- %% load up a new queue with everything that's on disk.
- %% don't remove non-persistent messages that happen to be on disk
- QList = rabbit_disk_queue:dump_queue(Q),
- Length = erlang:length(QList),
- MsgBuf = queue:from_list(QList),
+ %% load up a new queue with a token that says how many messages
+ %% are on disk
+ %% don't actually do anything to the disk
+ MsgBuf = case Length of
+ 0 -> queue:new();
+ _ -> queue:from_list([{disk, Length}])
+ end,
%% remove txn messages from disk which are neither persistent and
%% durable. This is necessary to avoid leaks. This is also pretty
%% much the inverse behaviour of our own tx_cancel/2 which is why
@@ -300,7 +308,7 @@ deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q,
is_durable = IsDurable, length = Length }) ->
{{value, Value}, MsgBuf1}
= queue:out(MsgBuf),
- {Msg, IsDelivered, AckTag} =
+ {Msg, IsDelivered, AckTag, MsgBuf2} =
case Value of
{Msg1 = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
@@ -320,10 +328,9 @@ deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q,
end;
false -> noack
end,
- {Msg1, IsDelivered1, AckTag1};
- {MsgId, IsDelivered1} ->
- {Msg1 = #basic_message { guid = MsgId,
- is_persistent = IsPersistent },
+ {Msg1, IsDelivered1, AckTag1, MsgBuf1};
+ {disk, Rem1} ->
+ {Msg1 = #basic_message { is_persistent = IsPersistent },
_Size, IsDelivered1, AckTag1, _PersistRem}
= rabbit_disk_queue:deliver(Q),
AckTag2 =
@@ -332,11 +339,15 @@ deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q,
false -> rabbit_disk_queue:ack(Q, [AckTag1]),
noack
end,
- {Msg1, IsDelivered1, AckTag2}
+ MsgBuf3 = case Rem1 of
+ 1 -> MsgBuf1;
+ _ -> queue:in_r({disk, Rem1 - 1}, MsgBuf1)
+ end,
+ {Msg1, IsDelivered1, AckTag2, MsgBuf3}
end,
Rem = Length - 1,
{{Msg, IsDelivered, AckTag, Rem},
- State #mqstate { msg_buf = MsgBuf1, length = Rem }}.
+ State #mqstate { msg_buf = MsgBuf2, length = Rem }}.
remove_noacks(MsgsWithAcks) ->
{AckTags, ASize} =
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 0b70be0c9b..b56d71c8c2 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -690,11 +690,10 @@ delete_log_handlers(Handlers) ->
test_disk_queue() ->
rdq_stop(),
rdq_virgin(),
- passed = rdq_stress_gc(10000),
+ passed = rdq_stress_gc(1000),
passed = rdq_test_startup_with_queue_gaps(),
passed = rdq_test_redeliver(),
passed = rdq_test_purge(),
- passed = rdq_test_dump_queue(1000),
passed = rdq_test_mixed_queue_modes(),
passed = rdq_test_mode_conversion_mid_txn(),
rdq_virgin(),
@@ -938,37 +937,6 @@ rdq_test_purge() ->
rdq_stop(),
passed.
-rdq_test_dump_queue(Total) ->
- rdq_virgin(),
- rdq_start(),
- Msg = <<0:(8*256)>>,
- All = lists:seq(1,Total),
- [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All],
- rabbit_disk_queue:tx_commit(q, All, []),
- io:format("Publish done~n", []),
- QList = [{N, false} || N <- All],
- {Micros, QList} = timer:tc(rabbit_disk_queue, dump_queue, [q]),
- rdq_stop(),
- io:format("dump ok undelivered (~w micros)~n", [Micros]),
- {Micros1, _} = timer:tc(rabbit_tests, rdq_start, []),
- io:format("restarted (~w micros)~n", [Micros1]),
- lists:foreach(
- fun (N) ->
- Remaining = Total - N,
- {Message, _TSize, false, _SeqId, Remaining} =
- rabbit_disk_queue:deliver(q),
- ok = rdq_match_message(Message, N, Msg, 256)
- end, All),
- [] = rabbit_disk_queue:dump_queue(q),
- rdq_stop(),
- io:format("dump ok post delivery~n", []),
- rdq_start(),
- QList2 = [{N, true} || N <- All],
- {Micros2, QList2} = timer:tc(rabbit_disk_queue, dump_queue, [q]),
- io:format("dump ok post delivery + restart (~w micros)~n", [Micros2]),
- rdq_stop(),
- passed.
-
rdq_test_mixed_queue_modes() ->
rdq_virgin(),
rdq_start(),