diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-04 16:14:05 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-04 16:14:05 +0100 |
| commit | c310db59b9098ff7632c0f4ddaf736df59ef5646 (patch) | |
| tree | 68d8e3e3fee034141e424617f325fd598dcdab12 | |
| parent | c384ec140a7c32631f2df37d39d6ad1bf50dc020 (diff) | |
| download | rabbitmq-server-git-c310db59b9098ff7632c0f4ddaf736df59ef5646.tar.gz | |
added dump_queue to the disk queue. This spits out a list of the queue contents from the current read pointer to the end of the queue (i.e. messages for which we are waiting for acks will not be included). Of course, at startup, all the read pointers are at the start of the queue (i.e. not waiting for any acks) so this grabs everything. Some minor refactoring was involved in the addition of this function. Also, I needed to change my definition of unfold so that it's now both simultaneously an anamorphism and a catamorphism instead of just an anamorphism (i.e. the accumulator / 'initial' value is spat out at the end). This could be a hylomorphism. Can't remember...
| -rw-r--r-- | src/rabbit_disk_queue.erl | 83 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 29 |
3 files changed, 89 insertions, 25 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index da25e52495..2ffb9a754d 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -40,7 +40,9 @@ -export([publish/3, publish_with_seq/4, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1, - requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1]). + requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1, + dump_queue/1 + ]). -export([length/1, is_empty/1]). @@ -245,6 +247,8 @@ -spec(requeue/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). -spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id_or_next()}]) -> 'ok'). -spec(purge/1 :: (queue_name()) -> non_neg_integer()). +-spec(dump_queue/1 :: (queue_name()) -> [{msg_id(), binary(), non_neg_integer(), + bool(), {msg_id(), seq_id()}}]). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). -spec(to_ram_disk_mode/0 :: () -> 'ok'). @@ -300,6 +304,9 @@ purge(Q) -> delete_queue(Q) -> gen_server2:cast(?SERVER, {delete_queue, Q}). +dump_queue(Q) -> + gen_server2:call(?SERVER, {dump_queue, Q}, infinity). + stop() -> gen_server2:call(?SERVER, stop, infinity). @@ -439,7 +446,10 @@ handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) -> case ets:lookup(Sequences, Q) of [] -> {reply, 0, State}; [{Q, _ReadSeqId, _WriteSeqId, Length}] -> {reply, Length, State} - end. + end; +handle_call({dump_queue, Q}, _From, State) -> + {Result, State1} = internal_dump_queue(Q, State), + {reply, Result, State1}. handle_cast({publish, Q, MsgId, MsgBody}, State) -> {ok, State1} = internal_publish(Q, MsgId, next, MsgBody, State), @@ -611,28 +621,37 @@ internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) -> [] -> {ok, empty, State}; [{Q, SeqId, SeqId, 0}] -> {ok, empty, State}; [{Q, ReadSeqId, WriteSeqId, Length}] when Length > 0 -> - [Obj = - #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId, - next_seq_id = ReadSeqId2}] = - mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}), - [{MsgId, _RefCount, File, Offset, TotalSize}] = - dets_ets_lookup(State, MsgId), Remaining = Length - 1, - true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId, Remaining}), - ok = - if Delivered -> ok; - true -> - mnesia:dirty_write(rabbit_disk_queue, - Obj #dq_msg_loc {is_delivered = true}) - end, - if ReadMsg -> - {FileHdl, State1} = get_read_handle(File, State), - {ok, {MsgBody, BodySize}} = - read_message_at_offset(FileHdl, Offset, TotalSize), - {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}, Remaining}, - State1}; - true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining}, State} - end + {ok, Result, NextReadSeqId, State1} = internal_read_message(Q, ReadSeqId, false, ReadMsg, State), + true = ets:insert(Sequences, {Q, NextReadSeqId, WriteSeqId, Remaining}), + {ok, case Result of + {MsgId, Delivered, {MsgId, ReadSeqId}} -> + {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining}; + {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}} -> + {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}, Remaining} + end, State1} + end. + +internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) -> + [Obj = + #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId, + next_seq_id = NextReadSeqId}] = + mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}), + [{MsgId, _RefCount, File, Offset, TotalSize}] = + dets_ets_lookup(State, MsgId), + ok = + if FakeDeliver orelse Delivered -> ok; + true -> + mnesia:dirty_write(rabbit_disk_queue, + Obj #dq_msg_loc {is_delivered = true}) + end, + if ReadMsg -> + {FileHdl, State1} = get_read_handle(File, State), + {ok, {MsgBody, BodySize}} = + read_message_at_offset(FileHdl, Offset, TotalSize), + {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}}, + NextReadSeqId, State1}; + true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, NextReadSeqId, State} end. internal_ack(Q, MsgSeqIds, State) -> @@ -863,7 +882,7 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) -> mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - MsgSeqIds = + {MsgSeqIds, WriteSeqId} = rabbit_misc:unfold( fun (SeqId) when SeqId == WriteSeqId -> false; (SeqId) -> @@ -900,6 +919,22 @@ internal_delete_queue(Q, State) -> end), {ok, State2}. +internal_dump_queue(Q, State = #dqstate { sequences = Sequences }) -> + case ets:lookup(Sequences, Q) of + [] -> {[], State}; + [{Q, ReadSeq, WriteSeq, _Length}] -> + {QList, {WriteSeq, State3}} = + rabbit_misc:unfold( + fun ({SeqId, _State1}) when SeqId == WriteSeq -> + false; + ({SeqId, State1}) -> + {ok, Result, NextReadSeqId, State2} = + internal_read_message(Q, SeqId, true, true, State1), + {true, Result, {NextReadSeqId, State2}} + end, {ReadSeq, State}), + {lists:reverse(QList), State3} + end. + %% ---- ROLLING OVER THE APPEND FILE ---- maybe_roll_to_new_file(Offset, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 49faba293b..acadf2a0cc 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -429,5 +429,5 @@ unfold(Fun, Init) -> unfold(Fun, Acc, Init) -> case Fun(Init) of {true, E, I} -> unfold(Fun, [E|Acc], I); - false -> Acc + false -> {Acc, Init} end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 46c641fc93..75b36d6f3d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -703,6 +703,7 @@ test_disk_queue() -> passed = rdq_test_startup_with_queue_gaps(), passed = rdq_test_redeliver(), passed = rdq_test_purge(), + passed = rdq_test_dump_queue(), passed. rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> @@ -898,6 +899,34 @@ rdq_test_purge() -> rdq_stop(), passed. +rdq_test_dump_queue() -> + rdq_virgin(), + rdq_start(), + Msg = <<0:(8*256)>>, + Total = 1000, + All = lists:seq(1,Total), + [rabbit_disk_queue:tx_publish(N, Msg) || N <- All], + rabbit_disk_queue:tx_commit(q, All, []), + io:format("Publish done~n", []), + QList = [{N, Msg, 256, false, {N, (N-1)}} || N <- All], + QList = rabbit_disk_queue:dump_queue(q), + rdq_stop(), + io:format("dump ok undelivered~n", []), + rdq_start(), + lists:foreach( + fun (N) -> + Remaining = Total - N, + {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q) + end, All), + [] = rabbit_disk_queue:dump_queue(q), + rdq_stop(), + io:format("dump ok post delivery~n", []), + rdq_start(), + QList2 = [{N, Msg, 256, true, {N, (N-1)}} || N <- All], + QList2 = rabbit_disk_queue:dump_queue(q), + io:format("dump ok post delivery + restart~n", []), + passed. + rdq_time_commands(Funcs) -> lists:foreach(fun (F) -> F() end, Funcs). |
