diff options
| author | Rob Harrop <rharrop@vmware.com> | 2010-09-15 15:25:30 +0100 |
|---|---|---|
| committer | Rob Harrop <rharrop@vmware.com> | 2010-09-15 15:25:30 +0100 |
| commit | 2dfcd8bab4e0f9a37a05b373e6c673d68578598e (patch) | |
| tree | 28f266feef8aa4990aefd329ba8eb7b2d3df46a7 | |
| parent | ccef72413241141ec7a20e67219cf6f1ee7c5391 (diff) | |
| download | rabbitmq-server-git-2dfcd8bab4e0f9a37a05b373e6c673d68578598e.tar.gz | |
midway through fixing test_queue_recover
| -rw-r--r-- | src/rabbit_queue_index.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 3 |
3 files changed, 35 insertions, 7 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index c631f7a26b..1b9cd38218 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -427,6 +427,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% find the number of unacked messages. lists:foldl( fun (Seg, {Segments2, CountAcc}) -> + io:format("Recovering: ~p~n", [Seg]), Segment = #segment { unacked = UnackedCount } = recover_segment(ContainsCheckFun, CleanShutdown, segment_find_or_new(Seg, Dir, Segments2)), @@ -584,6 +585,7 @@ maybe_flush_journal(State) -> State. flush_journal(State = #qistate { segments = Segments }) -> + io:format("Flushing journal~n"), Segments1 = segment_fold( fun (#segment { unacked = 0, path = Path }, SegmentsN) -> @@ -709,7 +711,8 @@ all_segment_nums(#qistate { dir = Dir, segments = Segments }) -> segment_find_or_new(Seg, Dir, Segments) -> case segment_find(Seg, Segments) of {ok, Segment} -> Segment; - error -> SegName = integer_to_list(Seg) ++ ?SEGMENT_EXTENSION, + error -> io:format("New Seg~n"), + SegName = integer_to_list(Seg) ++ ?SEGMENT_EXTENSION, Path = filename:join(Dir, SegName), #segment { num = Seg, path = Path, @@ -805,8 +808,10 @@ segment_entries_foldr(Fun, Init, %% %% Does not do any combining with the journal at all. load_segment(KeepAcked, #segment { path = Path }) -> + io:format("path: ~p~n", [Path]), case filelib:is_file(Path) of - false -> {array_new(), 0}; + false -> io:format("Creating new~n"), + {array_new(), 0}; true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0), @@ -815,6 +820,7 @@ load_segment(KeepAcked, #segment { path = Path }) -> end. load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> + io:format("Loading seg entries"), case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 6e9fd8c8cf..79792786b2 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1414,6 +1414,7 @@ test_backing_queue() -> application:set_env(rabbit, msg_store_file_size_limit, FileSizeLimit, infinity), passed = test_queue_index(), + passed = test_queue_index_props(), passed = test_variable_queue(), passed = test_queue_recover(), application:set_env(rabbit, queue_index_max_journal_entries, @@ -1657,6 +1658,21 @@ verify_read_with_published(Delivered, Persistent, verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> ko. +test_queue_index_props() -> + with_empty_test_queue( + fun(Qi0) -> + Guid = rabbit_guid:guid(), + Props = #msg_properties{expiry=12345}, + Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0), + {[{Guid, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1), + Qi2 + end), + + ok = rabbit_variable_queue:stop(), + ok = rabbit_variable_queue:start([]), + + passed. + test_queue_index() -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), TwoSegs = SegmentSize + SegmentSize, @@ -1798,7 +1814,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> lists:foldl(fun (N, {VQN, AckTagsAcc}) -> Rem = Len - N, {{#basic_message { is_persistent = IsPersistent }, - IsDelivered, AckTagN, Rem}, VQM} = + _Props, IsDelivered, AckTagN, Rem}, VQM} = rabbit_variable_queue:fetch(true, VQN), {VQM, [AckTagN | AckTagsAcc]} end, {VQ, []}, lists:seq(1, Count)). @@ -1838,6 +1854,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% squeeze and relax queue Churn = Len div 32, VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), + {Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), VQ7 = lists:foldl( fun (Duration1, VQ4) -> @@ -1860,7 +1877,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> VQ0; publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), - {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + {{_Msg, _MsgProps, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)). test_variable_queue_partial_segments_delta_thing(VQ0) -> @@ -1924,7 +1941,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true), - {{_Msg1, true, _AckTag1, Count1}, VQ8} = + {{_Msg1, _Props, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9), @@ -1936,7 +1953,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish(false, 4, VQ1), {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), - VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3), + VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true), @@ -1954,6 +1971,7 @@ test_queue_recover() -> sender = self(), message = Msg}, [true = rabbit_amqqueue:deliver(QPid, Delivery) || _ <- lists:seq(1, Count)], + io:format("Calling commit~n"), rabbit_amqqueue:commit_all([QPid], TxID, self()), exit(QPid, kill), MRef = erlang:monitor(process, QPid), @@ -1961,6 +1979,7 @@ test_queue_recover() -> after 10000 -> exit(timeout_waiting_for_queue_death) end, rabbit_amqqueue:stop(), + io:format("Restarting queue~n"), ok = rabbit_amqqueue:start(), rabbit_amqqueue:with_or_die( QName, @@ -1970,7 +1989,7 @@ test_queue_recover() -> rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), VQ1 = rabbit_variable_queue:init(QName, true, true), - {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = + {{_Msg1, _Props, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), rabbit_amqqueue:internal_delete(QName) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index aee8d47b36..c8d3c6e4e8 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -370,6 +370,7 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). init(QueueName, IsDurable, Recover) -> + io:format("Initing ~p ~p~n", [QueueName, Recover]), {DeltaCount, Terms, IndexState} = rabbit_queue_index:init( QueueName, Recover, @@ -377,6 +378,7 @@ init(QueueName, IsDurable, Recover) -> fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end), + io:format("Inited: ~p~n", [DeltaCount]), {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), {PRef, TRef, Terms1} = @@ -956,6 +958,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), Pubs = lists:append(lists:reverse(SPubs)), + io:format("Committing: ~p~n", [length(Pubs)]), {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties}, |
