diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 83 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 29 |
3 files changed, 89 insertions, 25 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index da25e52495..2ffb9a754d 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -40,7 +40,9 @@ -export([publish/3, publish_with_seq/4, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1, - requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1]). + requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1, + dump_queue/1 + ]). -export([length/1, is_empty/1]). @@ -245,6 +247,8 @@ -spec(requeue/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). -spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id_or_next()}]) -> 'ok'). -spec(purge/1 :: (queue_name()) -> non_neg_integer()). +-spec(dump_queue/1 :: (queue_name()) -> [{msg_id(), binary(), non_neg_integer(), + bool(), {msg_id(), seq_id()}}]). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). -spec(to_ram_disk_mode/0 :: () -> 'ok'). @@ -300,6 +304,9 @@ purge(Q) -> delete_queue(Q) -> gen_server2:cast(?SERVER, {delete_queue, Q}). +dump_queue(Q) -> + gen_server2:call(?SERVER, {dump_queue, Q}, infinity). + stop() -> gen_server2:call(?SERVER, stop, infinity). @@ -439,7 +446,10 @@ handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) -> case ets:lookup(Sequences, Q) of [] -> {reply, 0, State}; [{Q, _ReadSeqId, _WriteSeqId, Length}] -> {reply, Length, State} - end. + end; +handle_call({dump_queue, Q}, _From, State) -> + {Result, State1} = internal_dump_queue(Q, State), + {reply, Result, State1}. handle_cast({publish, Q, MsgId, MsgBody}, State) -> {ok, State1} = internal_publish(Q, MsgId, next, MsgBody, State), @@ -611,28 +621,37 @@ internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) -> [] -> {ok, empty, State}; [{Q, SeqId, SeqId, 0}] -> {ok, empty, State}; [{Q, ReadSeqId, WriteSeqId, Length}] when Length > 0 -> - [Obj = - #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId, - next_seq_id = ReadSeqId2}] = - mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}), - [{MsgId, _RefCount, File, Offset, TotalSize}] = - dets_ets_lookup(State, MsgId), Remaining = Length - 1, - true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId, Remaining}), - ok = - if Delivered -> ok; - true -> - mnesia:dirty_write(rabbit_disk_queue, - Obj #dq_msg_loc {is_delivered = true}) - end, - if ReadMsg -> - {FileHdl, State1} = get_read_handle(File, State), - {ok, {MsgBody, BodySize}} = - read_message_at_offset(FileHdl, Offset, TotalSize), - {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}, Remaining}, - State1}; - true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining}, State} - end + {ok, Result, NextReadSeqId, State1} = internal_read_message(Q, ReadSeqId, false, ReadMsg, State), + true = ets:insert(Sequences, {Q, NextReadSeqId, WriteSeqId, Remaining}), + {ok, case Result of + {MsgId, Delivered, {MsgId, ReadSeqId}} -> + {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining}; + {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}} -> + {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}, Remaining} + end, State1} + end. + +internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) -> + [Obj = + #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId, + next_seq_id = NextReadSeqId}] = + mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}), + [{MsgId, _RefCount, File, Offset, TotalSize}] = + dets_ets_lookup(State, MsgId), + ok = + if FakeDeliver orelse Delivered -> ok; + true -> + mnesia:dirty_write(rabbit_disk_queue, + Obj #dq_msg_loc {is_delivered = true}) + end, + if ReadMsg -> + {FileHdl, State1} = get_read_handle(File, State), + {ok, {MsgBody, BodySize}} = + read_message_at_offset(FileHdl, Offset, TotalSize), + {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}}, + NextReadSeqId, State1}; + true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, NextReadSeqId, State} end. internal_ack(Q, MsgSeqIds, State) -> @@ -863,7 +882,7 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) -> mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - MsgSeqIds = + {MsgSeqIds, WriteSeqId} = rabbit_misc:unfold( fun (SeqId) when SeqId == WriteSeqId -> false; (SeqId) -> @@ -900,6 +919,22 @@ internal_delete_queue(Q, State) -> end), {ok, State2}. +internal_dump_queue(Q, State = #dqstate { sequences = Sequences }) -> + case ets:lookup(Sequences, Q) of + [] -> {[], State}; + [{Q, ReadSeq, WriteSeq, _Length}] -> + {QList, {WriteSeq, State3}} = + rabbit_misc:unfold( + fun ({SeqId, _State1}) when SeqId == WriteSeq -> + false; + ({SeqId, State1}) -> + {ok, Result, NextReadSeqId, State2} = + internal_read_message(Q, SeqId, true, true, State1), + {true, Result, {NextReadSeqId, State2}} + end, {ReadSeq, State}), + {lists:reverse(QList), State3} + end. + %% ---- ROLLING OVER THE APPEND FILE ---- maybe_roll_to_new_file(Offset, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 49faba293b..acadf2a0cc 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -429,5 +429,5 @@ unfold(Fun, Init) -> unfold(Fun, Acc, Init) -> case Fun(Init) of {true, E, I} -> unfold(Fun, [E|Acc], I); - false -> Acc + false -> {Acc, Init} end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 46c641fc93..75b36d6f3d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -703,6 +703,7 @@ test_disk_queue() -> passed = rdq_test_startup_with_queue_gaps(), passed = rdq_test_redeliver(), passed = rdq_test_purge(), + passed = rdq_test_dump_queue(), passed. rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> @@ -898,6 +899,34 @@ rdq_test_purge() -> rdq_stop(), passed. +rdq_test_dump_queue() -> + rdq_virgin(), + rdq_start(), + Msg = <<0:(8*256)>>, + Total = 1000, + All = lists:seq(1,Total), + [rabbit_disk_queue:tx_publish(N, Msg) || N <- All], + rabbit_disk_queue:tx_commit(q, All, []), + io:format("Publish done~n", []), + QList = [{N, Msg, 256, false, {N, (N-1)}} || N <- All], + QList = rabbit_disk_queue:dump_queue(q), + rdq_stop(), + io:format("dump ok undelivered~n", []), + rdq_start(), + lists:foreach( + fun (N) -> + Remaining = Total - N, + {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q) + end, All), + [] = rabbit_disk_queue:dump_queue(q), + rdq_stop(), + io:format("dump ok post delivery~n", []), + rdq_start(), + QList2 = [{N, Msg, 256, true, {N, (N-1)}} || N <- All], + QList2 = rabbit_disk_queue:dump_queue(q), + io:format("dump ok post delivery + restart~n", []), + passed. + rdq_time_commands(Funcs) -> lists:foreach(fun (F) -> F() end, Funcs). |
