diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-05-24 13:54:20 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-05-24 13:54:20 +0100 |
| commit | 8a6e5ad71035c40b6ace5ca5bddcc8cb626d0f6c (patch) | |
| tree | 70d9506b6c85e2e3c60dc4dde2e1c483607c7ff2 | |
| parent | 239aad5cf469d24c54e93c9137f3bb5717a089b2 (diff) | |
| parent | 3be640d0add86b327c17274a34e6231e0b70f66a (diff) | |
| download | rabbitmq-server-git-8a6e5ad71035c40b6ace5ca5bddcc8cb626d0f6c.tar.gz | |
merge heads
| -rw-r--r-- | src/rabbit_queue_index.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 2 |
3 files changed, 34 insertions, 38 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 93dce89e05..02d0d8ad41 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/3, terminate/2, terminate_and_erase/1, publish/4, +-export([init/3, terminate/2, delete_and_terminate/1, publish/4, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). @@ -188,7 +188,7 @@ -spec(init/3 :: (queue_name(), boolean(), fun ((guid()) -> boolean())) -> {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). --spec(terminate_and_erase/1 :: (qistate()) -> qistate()). +-spec(delete_and_terminate/1 :: (qistate()) -> qistate()). -spec(publish/4 :: (guid(), seq_id(), boolean(), qistate()) -> qistate()). -spec(deliver/2 :: (seq_id(), qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). @@ -225,11 +225,13 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) -> {Count, Terms, State1}. terminate(Terms, State) -> - terminate(true, Terms, State). + {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), + store_clean_shutdown([{segments, SegmentCounts} | Terms], Dir), + State1. -terminate_and_erase(State) -> - State1 = terminate(false, [], State), - ok = rabbit_misc:recursive_delete([State1 #qistate.dir]), +delete_and_terminate(State) -> + {_SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), + ok = rabbit_misc:recursive_delete([Dir]), State1. publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> @@ -406,7 +408,7 @@ init_clean(RecoveredCounts, State) -> lists:foldl( fun ({Seg, UnackedCount}, SegmentsN) -> Segment = segment_find_or_new(Seg, Dir, SegmentsN), - segment_store(Segment #segment {unacked = UnackedCount }, + segment_store(Segment #segment { unacked = UnackedCount }, SegmentsN) end, Segments, RecoveredCounts), %% the counts above include transient messages, which would be the @@ -415,7 +417,7 @@ init_clean(RecoveredCounts, State) -> init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% Recover the journal completely. This will also load segments - %% which have entries in the journal and remove duplicates. The + %% which have entries in the journal and remove duplicates. The %% counts will correctly reflect the combination of the segment %% and the journal. State1 = #qistate { dir = Dir, segments = Segments } = @@ -438,29 +440,24 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> State2 = flush_journal(State1 #qistate { segments = Segments1 }), {Count, State2}. -terminate(_StoreShutdown, _Terms, State = #qistate { segments = undefined }) -> - State; -terminate(StoreShutdown, Terms, State = - #qistate { journal_handle = JournalHdl, - dir = Dir, segments = Segments }) -> +terminate(State = #qistate { journal_handle = JournalHdl, + segments = Segments }) -> ok = case JournalHdl of undefined -> ok; _ -> file_handle_cache:close(JournalHdl) end, - SegTerms = segment_fold( - fun (Seg, #segment { handle = Hdl, - unacked = UnackedCount }, SegTermsAcc) -> - ok = case Hdl of - undefined -> ok; - _ -> file_handle_cache:close(Hdl) - end, - [{Seg, UnackedCount} | SegTermsAcc] - end, [], Segments), - case StoreShutdown of - true -> store_clean_shutdown([{segments, SegTerms} | Terms], Dir); - false -> ok - end, - State #qistate { journal_handle = undefined, segments = undefined }. + SegmentCounts = + segment_fold( + fun (Seg, #segment { handle = Hdl, unacked = UnackedCount }, + SegmentCountsAcc) -> + ok = case Hdl of + undefined -> ok; + _ -> file_handle_cache:close(Hdl) + end, + [{Seg, UnackedCount} | SegmentCountsAcc] + end, [], Segments), + {SegmentCounts, State #qistate { journal_handle = undefined, + segments = undefined }}. recover_segment(ContainsCheckFun, CleanShutdown, Segment) -> {SegEntries, UnackedCount, Segment1} = load_segment(false, Segment), @@ -530,7 +527,7 @@ queue_index_walker_reader(QueueName, Gatherer) -> {Guid, _SeqId, true, _IsDelivered} <- Messages], State3 end, State, all_segment_nums(State)), - _State = terminate(false, [], State1), + {_SegmentCounts, _State} = terminate(State1), ok = gatherer:finish(Gatherer). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 49ebb32ba3..e1ee5d080c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1395,7 +1395,7 @@ test_queue() -> empty_test_queue() -> ok = rabbit_variable_queue:start([]), {0, _Terms, Qi1} = test_queue_init(), - _Qi2 = rabbit_queue_index:terminate_and_erase(Qi1), + _Qi2 = rabbit_queue_index:delete_and_terminate(Qi1), ok. queue_index_publish(SeqIds, Persistent, Qi) -> @@ -1455,8 +1455,7 @@ test_queue_index() -> {ReadA, undefined, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3), ok = verify_read_with_published(false, false, ReadA, lists:reverse(SeqIdsGuidsA)), - %% call terminate twice to prove it's idempotent - _Qi5 = rabbit_queue_index:terminate([], rabbit_queue_index:terminate([], Qi4)), + _Qi5 = rabbit_queue_index:terminate([], Qi4), ok = stop_msg_store(), ok = rabbit_variable_queue:start([test_queue()]), %% should get length back as 0, as all the msgs were transient @@ -1487,7 +1486,7 @@ test_queue_index() -> ok = rabbit_variable_queue:start([test_queue()]), %% should get length back as 0 because all persistent msgs have been acked {0, _Terms3, Qi20} = test_queue_init(), - _Qi21 = rabbit_queue_index:terminate_and_erase(Qi20), + _Qi21 = rabbit_queue_index:delete_and_terminate(Qi20), ok = stop_msg_store(), ok = empty_test_queue(), @@ -1501,7 +1500,7 @@ test_queue_index() -> Qi25 = rabbit_queue_index:ack(SeqIdsC, Qi24), Qi26 = queue_index_flush(Qi25), {Qi27, _SeqIdsGuidsC1} = queue_index_publish([SegmentSize], false, Qi26), - _Qi28 = rabbit_queue_index:terminate_and_erase(Qi27), + _Qi28 = rabbit_queue_index:delete_and_terminate(Qi27), ok = stop_msg_store(), ok = empty_test_queue(), @@ -1512,7 +1511,7 @@ test_queue_index() -> {Qi32, _SeqIdsGuidsC3} = queue_index_publish([SegmentSize], false, Qi31), Qi33 = rabbit_queue_index:ack(SeqIdsC, Qi32), Qi34 = queue_index_flush(Qi33), - _Qi35 = rabbit_queue_index:terminate_and_erase(Qi34), + _Qi35 = rabbit_queue_index:delete_and_terminate(Qi34), ok = stop_msg_store(), ok = empty_test_queue(), @@ -1523,7 +1522,7 @@ test_queue_index() -> Qi38 = queue_index_deliver(SeqIdsD, Qi37), Qi39 = rabbit_queue_index:ack(SeqIdsD, Qi38), Qi40 = queue_index_flush(Qi39), - _Qi41 = rabbit_queue_index:terminate_and_erase(Qi40), + _Qi41 = rabbit_queue_index:delete_and_terminate(Qi40), ok = stop_msg_store(), ok = empty_test_queue(), @@ -1543,7 +1542,7 @@ test_queue_index() -> ok = verify_read_with_published(true, false, ReadD, [Four, Five, Six]), {ReadE, undefined, Qi52} = rabbit_queue_index:read(7, 9, Qi51), ok = verify_read_with_published(false, false, ReadE, [Seven, Eight]), - _Qi53 = rabbit_queue_index:terminate_and_erase(Qi52), + _Qi53 = rabbit_queue_index:delete_and_terminate(Qi52), ok = stop_msg_store(), ok = empty_test_queue(), @@ -1564,7 +1563,7 @@ test_queue_index() -> ok = stop_msg_store(), ok = rabbit_variable_queue:start([test_queue()]), {5, _Terms10, Qi64} = test_queue_init(), - _Qi65 = rabbit_queue_index:terminate_and_erase(Qi64), + _Qi65 = rabbit_queue_index:delete_and_terminate(Qi64), ok = stop_msg_store(), ok = rabbit_variable_queue:start([]), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index cc876b5e6a..3d485106d3 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -398,7 +398,7 @@ delete_and_terminate(State) -> {DeltaSeqId, NextSeqId, IndexState3} -> delete1(PersistentStore, TransientThreshold, NextSeqId, DeltaSeqId, IndexState3) end, - IndexState5 = rabbit_queue_index:terminate_and_erase(IndexState2), + IndexState5 = rabbit_queue_index:delete_and_terminate(IndexState2), rabbit_msg_store:delete_client(PersistentStore, PRef), rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef), rabbit_msg_store:client_terminate(MSCStateP), |
