summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl83
-rw-r--r--src/rabbit_misc.erl2
-rw-r--r--src/rabbit_tests.erl29
3 files changed, 89 insertions, 25 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index da25e52495..2ffb9a754d 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -40,7 +40,9 @@
-export([publish/3, publish_with_seq/4, deliver/1, phantom_deliver/1, ack/2,
tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1,
- requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1]).
+ requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1,
+ dump_queue/1
+ ]).
-export([length/1, is_empty/1]).
@@ -245,6 +247,8 @@
-spec(requeue/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
-spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id_or_next()}]) -> 'ok').
-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
+-spec(dump_queue/1 :: (queue_name()) -> [{msg_id(), binary(), non_neg_integer(),
+ bool(), {msg_id(), seq_id()}}]).
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
-spec(to_ram_disk_mode/0 :: () -> 'ok').
@@ -300,6 +304,9 @@ purge(Q) ->
delete_queue(Q) ->
gen_server2:cast(?SERVER, {delete_queue, Q}).
+dump_queue(Q) ->
+ gen_server2:call(?SERVER, {dump_queue, Q}, infinity).
+
stop() ->
gen_server2:call(?SERVER, stop, infinity).
@@ -439,7 +446,10 @@ handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) ->
case ets:lookup(Sequences, Q) of
[] -> {reply, 0, State};
[{Q, _ReadSeqId, _WriteSeqId, Length}] -> {reply, Length, State}
- end.
+ end;
+handle_call({dump_queue, Q}, _From, State) ->
+ {Result, State1} = internal_dump_queue(Q, State),
+ {reply, Result, State1}.
handle_cast({publish, Q, MsgId, MsgBody}, State) ->
{ok, State1} = internal_publish(Q, MsgId, next, MsgBody, State),
@@ -611,28 +621,37 @@ internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) ->
[] -> {ok, empty, State};
[{Q, SeqId, SeqId, 0}] -> {ok, empty, State};
[{Q, ReadSeqId, WriteSeqId, Length}] when Length > 0 ->
- [Obj =
- #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId,
- next_seq_id = ReadSeqId2}] =
- mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}),
- [{MsgId, _RefCount, File, Offset, TotalSize}] =
- dets_ets_lookup(State, MsgId),
Remaining = Length - 1,
- true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId, Remaining}),
- ok =
- if Delivered -> ok;
- true ->
- mnesia:dirty_write(rabbit_disk_queue,
- Obj #dq_msg_loc {is_delivered = true})
- end,
- if ReadMsg ->
- {FileHdl, State1} = get_read_handle(File, State),
- {ok, {MsgBody, BodySize}} =
- read_message_at_offset(FileHdl, Offset, TotalSize),
- {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}, Remaining},
- State1};
- true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining}, State}
- end
+ {ok, Result, NextReadSeqId, State1} = internal_read_message(Q, ReadSeqId, false, ReadMsg, State),
+ true = ets:insert(Sequences, {Q, NextReadSeqId, WriteSeqId, Remaining}),
+ {ok, case Result of
+ {MsgId, Delivered, {MsgId, ReadSeqId}} ->
+ {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining};
+ {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}} ->
+ {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}, Remaining}
+ end, State1}
+ end.
+
+internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) ->
+ [Obj =
+ #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId,
+ next_seq_id = NextReadSeqId}] =
+ mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}),
+ [{MsgId, _RefCount, File, Offset, TotalSize}] =
+ dets_ets_lookup(State, MsgId),
+ ok =
+ if FakeDeliver orelse Delivered -> ok;
+ true ->
+ mnesia:dirty_write(rabbit_disk_queue,
+ Obj #dq_msg_loc {is_delivered = true})
+ end,
+ if ReadMsg ->
+ {FileHdl, State1} = get_read_handle(File, State),
+ {ok, {MsgBody, BodySize}} =
+ read_message_at_offset(FileHdl, Offset, TotalSize),
+ {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}},
+ NextReadSeqId, State1};
+ true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, NextReadSeqId, State}
end.
internal_ack(Q, MsgSeqIds, State) ->
@@ -863,7 +882,7 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
mnesia:transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
- MsgSeqIds =
+ {MsgSeqIds, WriteSeqId} =
rabbit_misc:unfold(
fun (SeqId) when SeqId == WriteSeqId -> false;
(SeqId) ->
@@ -900,6 +919,22 @@ internal_delete_queue(Q, State) ->
end),
{ok, State2}.
+internal_dump_queue(Q, State = #dqstate { sequences = Sequences }) ->
+ case ets:lookup(Sequences, Q) of
+ [] -> {[], State};
+ [{Q, ReadSeq, WriteSeq, _Length}] ->
+ {QList, {WriteSeq, State3}} =
+ rabbit_misc:unfold(
+ fun ({SeqId, _State1}) when SeqId == WriteSeq ->
+ false;
+ ({SeqId, State1}) ->
+ {ok, Result, NextReadSeqId, State2} =
+ internal_read_message(Q, SeqId, true, true, State1),
+ {true, Result, {NextReadSeqId, State2}}
+ end, {ReadSeq, State}),
+ {lists:reverse(QList), State3}
+ end.
+
%% ---- ROLLING OVER THE APPEND FILE ----
maybe_roll_to_new_file(Offset,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 49faba293b..acadf2a0cc 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -429,5 +429,5 @@ unfold(Fun, Init) ->
unfold(Fun, Acc, Init) ->
case Fun(Init) of
{true, E, I} -> unfold(Fun, [E|Acc], I);
- false -> Acc
+ false -> {Acc, Init}
end.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 46c641fc93..75b36d6f3d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -703,6 +703,7 @@ test_disk_queue() ->
passed = rdq_test_startup_with_queue_gaps(),
passed = rdq_test_redeliver(),
passed = rdq_test_purge(),
+ passed = rdq_test_dump_queue(),
passed.
rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
@@ -898,6 +899,34 @@ rdq_test_purge() ->
rdq_stop(),
passed.
+rdq_test_dump_queue() ->
+ rdq_virgin(),
+ rdq_start(),
+ Msg = <<0:(8*256)>>,
+ Total = 1000,
+ All = lists:seq(1,Total),
+ [rabbit_disk_queue:tx_publish(N, Msg) || N <- All],
+ rabbit_disk_queue:tx_commit(q, All, []),
+ io:format("Publish done~n", []),
+ QList = [{N, Msg, 256, false, {N, (N-1)}} || N <- All],
+ QList = rabbit_disk_queue:dump_queue(q),
+ rdq_stop(),
+ io:format("dump ok undelivered~n", []),
+ rdq_start(),
+ lists:foreach(
+ fun (N) ->
+ Remaining = Total - N,
+ {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q)
+ end, All),
+ [] = rabbit_disk_queue:dump_queue(q),
+ rdq_stop(),
+ io:format("dump ok post delivery~n", []),
+ rdq_start(),
+ QList2 = [{N, Msg, 256, true, {N, (N-1)}} || N <- All],
+ QList2 = rabbit_disk_queue:dump_queue(q),
+ io:format("dump ok post delivery + restart~n", []),
+ passed.
+
rdq_time_commands(Funcs) ->
lists:foreach(fun (F) -> F() end, Funcs).