summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-24 13:54:20 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-24 13:54:20 +0100
commit8a6e5ad71035c40b6ace5ca5bddcc8cb626d0f6c (patch)
tree70d9506b6c85e2e3c60dc4dde2e1c483607c7ff2 /src
parent239aad5cf469d24c54e93c9137f3bb5717a089b2 (diff)
parent3be640d0add86b327c17274a34e6231e0b70f66a (diff)
downloadrabbitmq-server-git-8a6e5ad71035c40b6ace5ca5bddcc8cb626d0f6c.tar.gz
merge heads
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl53
-rw-r--r--src/rabbit_tests.erl17
-rw-r--r--src/rabbit_variable_queue.erl2
3 files changed, 34 insertions, 38 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 93dce89e05..02d0d8ad41 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,7 +31,7 @@
-module(rabbit_queue_index).
--export([init/3, terminate/2, terminate_and_erase/1, publish/4,
+-export([init/3, terminate/2, delete_and_terminate/1, publish/4,
deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
@@ -188,7 +188,7 @@
-spec(init/3 :: (queue_name(), boolean(), fun ((guid()) -> boolean())) ->
{'undefined' | non_neg_integer(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
--spec(terminate_and_erase/1 :: (qistate()) -> qistate()).
+-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
-spec(publish/4 :: (guid(), seq_id(), boolean(), qistate()) -> qistate()).
-spec(deliver/2 :: (seq_id(), qistate()) -> qistate()).
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
@@ -225,11 +225,13 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) ->
{Count, Terms, State1}.
terminate(Terms, State) ->
- terminate(true, Terms, State).
+ {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State),
+ store_clean_shutdown([{segments, SegmentCounts} | Terms], Dir),
+ State1.
-terminate_and_erase(State) ->
- State1 = terminate(false, [], State),
- ok = rabbit_misc:recursive_delete([State1 #qistate.dir]),
+delete_and_terminate(State) ->
+ {_SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State),
+ ok = rabbit_misc:recursive_delete([Dir]),
State1.
publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) ->
@@ -406,7 +408,7 @@ init_clean(RecoveredCounts, State) ->
lists:foldl(
fun ({Seg, UnackedCount}, SegmentsN) ->
Segment = segment_find_or_new(Seg, Dir, SegmentsN),
- segment_store(Segment #segment {unacked = UnackedCount },
+ segment_store(Segment #segment { unacked = UnackedCount },
SegmentsN)
end, Segments, RecoveredCounts),
%% the counts above include transient messages, which would be the
@@ -415,7 +417,7 @@ init_clean(RecoveredCounts, State) ->
init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% Recover the journal completely. This will also load segments
- %% which have entries in the journal and remove duplicates. The
+ %% which have entries in the journal and remove duplicates. The
%% counts will correctly reflect the combination of the segment
%% and the journal.
State1 = #qistate { dir = Dir, segments = Segments } =
@@ -438,29 +440,24 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
State2 = flush_journal(State1 #qistate { segments = Segments1 }),
{Count, State2}.
-terminate(_StoreShutdown, _Terms, State = #qistate { segments = undefined }) ->
- State;
-terminate(StoreShutdown, Terms, State =
- #qistate { journal_handle = JournalHdl,
- dir = Dir, segments = Segments }) ->
+terminate(State = #qistate { journal_handle = JournalHdl,
+ segments = Segments }) ->
ok = case JournalHdl of
undefined -> ok;
_ -> file_handle_cache:close(JournalHdl)
end,
- SegTerms = segment_fold(
- fun (Seg, #segment { handle = Hdl,
- unacked = UnackedCount }, SegTermsAcc) ->
- ok = case Hdl of
- undefined -> ok;
- _ -> file_handle_cache:close(Hdl)
- end,
- [{Seg, UnackedCount} | SegTermsAcc]
- end, [], Segments),
- case StoreShutdown of
- true -> store_clean_shutdown([{segments, SegTerms} | Terms], Dir);
- false -> ok
- end,
- State #qistate { journal_handle = undefined, segments = undefined }.
+ SegmentCounts =
+ segment_fold(
+ fun (Seg, #segment { handle = Hdl, unacked = UnackedCount },
+ SegmentCountsAcc) ->
+ ok = case Hdl of
+ undefined -> ok;
+ _ -> file_handle_cache:close(Hdl)
+ end,
+ [{Seg, UnackedCount} | SegmentCountsAcc]
+ end, [], Segments),
+ {SegmentCounts, State #qistate { journal_handle = undefined,
+ segments = undefined }}.
recover_segment(ContainsCheckFun, CleanShutdown, Segment) ->
{SegEntries, UnackedCount, Segment1} = load_segment(false, Segment),
@@ -530,7 +527,7 @@ queue_index_walker_reader(QueueName, Gatherer) ->
{Guid, _SeqId, true, _IsDelivered} <- Messages],
State3
end, State, all_segment_nums(State)),
- _State = terminate(false, [], State1),
+ {_SegmentCounts, _State} = terminate(State1),
ok = gatherer:finish(Gatherer).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 49ebb32ba3..e1ee5d080c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1395,7 +1395,7 @@ test_queue() ->
empty_test_queue() ->
ok = rabbit_variable_queue:start([]),
{0, _Terms, Qi1} = test_queue_init(),
- _Qi2 = rabbit_queue_index:terminate_and_erase(Qi1),
+ _Qi2 = rabbit_queue_index:delete_and_terminate(Qi1),
ok.
queue_index_publish(SeqIds, Persistent, Qi) ->
@@ -1455,8 +1455,7 @@ test_queue_index() ->
{ReadA, undefined, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3),
ok = verify_read_with_published(false, false, ReadA,
lists:reverse(SeqIdsGuidsA)),
- %% call terminate twice to prove it's idempotent
- _Qi5 = rabbit_queue_index:terminate([], rabbit_queue_index:terminate([], Qi4)),
+ _Qi5 = rabbit_queue_index:terminate([], Qi4),
ok = stop_msg_store(),
ok = rabbit_variable_queue:start([test_queue()]),
%% should get length back as 0, as all the msgs were transient
@@ -1487,7 +1486,7 @@ test_queue_index() ->
ok = rabbit_variable_queue:start([test_queue()]),
%% should get length back as 0 because all persistent msgs have been acked
{0, _Terms3, Qi20} = test_queue_init(),
- _Qi21 = rabbit_queue_index:terminate_and_erase(Qi20),
+ _Qi21 = rabbit_queue_index:delete_and_terminate(Qi20),
ok = stop_msg_store(),
ok = empty_test_queue(),
@@ -1501,7 +1500,7 @@ test_queue_index() ->
Qi25 = rabbit_queue_index:ack(SeqIdsC, Qi24),
Qi26 = queue_index_flush(Qi25),
{Qi27, _SeqIdsGuidsC1} = queue_index_publish([SegmentSize], false, Qi26),
- _Qi28 = rabbit_queue_index:terminate_and_erase(Qi27),
+ _Qi28 = rabbit_queue_index:delete_and_terminate(Qi27),
ok = stop_msg_store(),
ok = empty_test_queue(),
@@ -1512,7 +1511,7 @@ test_queue_index() ->
{Qi32, _SeqIdsGuidsC3} = queue_index_publish([SegmentSize], false, Qi31),
Qi33 = rabbit_queue_index:ack(SeqIdsC, Qi32),
Qi34 = queue_index_flush(Qi33),
- _Qi35 = rabbit_queue_index:terminate_and_erase(Qi34),
+ _Qi35 = rabbit_queue_index:delete_and_terminate(Qi34),
ok = stop_msg_store(),
ok = empty_test_queue(),
@@ -1523,7 +1522,7 @@ test_queue_index() ->
Qi38 = queue_index_deliver(SeqIdsD, Qi37),
Qi39 = rabbit_queue_index:ack(SeqIdsD, Qi38),
Qi40 = queue_index_flush(Qi39),
- _Qi41 = rabbit_queue_index:terminate_and_erase(Qi40),
+ _Qi41 = rabbit_queue_index:delete_and_terminate(Qi40),
ok = stop_msg_store(),
ok = empty_test_queue(),
@@ -1543,7 +1542,7 @@ test_queue_index() ->
ok = verify_read_with_published(true, false, ReadD, [Four, Five, Six]),
{ReadE, undefined, Qi52} = rabbit_queue_index:read(7, 9, Qi51),
ok = verify_read_with_published(false, false, ReadE, [Seven, Eight]),
- _Qi53 = rabbit_queue_index:terminate_and_erase(Qi52),
+ _Qi53 = rabbit_queue_index:delete_and_terminate(Qi52),
ok = stop_msg_store(),
ok = empty_test_queue(),
@@ -1564,7 +1563,7 @@ test_queue_index() ->
ok = stop_msg_store(),
ok = rabbit_variable_queue:start([test_queue()]),
{5, _Terms10, Qi64} = test_queue_init(),
- _Qi65 = rabbit_queue_index:terminate_and_erase(Qi64),
+ _Qi65 = rabbit_queue_index:delete_and_terminate(Qi64),
ok = stop_msg_store(),
ok = rabbit_variable_queue:start([]),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index cc876b5e6a..3d485106d3 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -398,7 +398,7 @@ delete_and_terminate(State) ->
{DeltaSeqId, NextSeqId, IndexState3} ->
delete1(PersistentStore, TransientThreshold, NextSeqId, DeltaSeqId, IndexState3)
end,
- IndexState5 = rabbit_queue_index:terminate_and_erase(IndexState2),
+ IndexState5 = rabbit_queue_index:delete_and_terminate(IndexState2),
rabbit_msg_store:delete_client(PersistentStore, PRef),
rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef),
rabbit_msg_store:client_terminate(MSCStateP),