summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-05 01:08:41 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-05 01:08:41 +0100
commitdf3c84837b7021031f1c99acbc70f1e36d3fa70f (patch)
tree097877dd3c45d0092e78f81399d349295f875dfc /src
parent04178c056f9c666f31c5f42cf94d76d6191299eb (diff)
downloadrabbitmq-server-git-df3c84837b7021031f1c99acbc70f1e36d3fa70f.tar.gz
made shutdown term structure opaque to qi
only the vq needs to know about it
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl47
-rw-r--r--src/rabbit_tests.erl16
-rw-r--r--src/rabbit_variable_queue.erl21
3 files changed, 40 insertions, 44 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index aee295ae3b..95df893883 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -196,8 +196,7 @@
{(fun ((A) -> 'finished' | {guid(), non_neg_integer(), A})), A}).
-spec(init/3 :: (queue_name(), boolean(), fun ((guid()) -> boolean())) ->
- {'undefined' |
- non_neg_integer(), binary(), binary(), [any()], qistate()}).
+ {'undefined' | non_neg_integer(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(terminate_and_erase/1 :: (qistate()) -> qistate()).
-spec(write_published/4 :: (guid(), seq_id(), boolean(), qistate())
@@ -212,7 +211,7 @@
-spec(segment_size/0 :: () -> non_neg_integer()).
-spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
--spec(recover/1 :: ([queue_name()]) -> {[binary()], startup_fun_state()}).
+-spec(recover/1 :: ([queue_name()]) -> {[[any()]], startup_fun_state()}).
-endif.
@@ -223,20 +222,10 @@
init(Name, MsgStoreRecovered, ContainsCheckFun) ->
State = blank_state(Name),
- {PRef, TRef, Terms} =
- case read_shutdown_terms(State #qistate.dir) of
- {error, _} ->
- {rabbit_guid:guid(), rabbit_guid:guid(), []};
- {ok, Terms1} ->
- case [persistent_ref, transient_ref] --
- proplists:get_keys(Terms1) of
- [] ->
- {proplists:get_value(persistent_ref, Terms1),
- proplists:get_value(transient_ref, Terms1), Terms1};
- _ ->
- {rabbit_guid:guid(), rabbit_guid:guid(), []}
- end
- end,
+ Terms = case read_shutdown_terms(State #qistate.dir) of
+ {error, _} -> [];
+ {ok, Terms1} -> Terms1
+ end,
%% 1. Load the journal completely. This will also load segments
%% which have entries in the journal and remove duplicates.
%% The counts will correctly reflect the combination of the
@@ -305,7 +294,7 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) ->
%% artificially set the dirty_count non zero and call flush again
State3 = flush_journal(State2 #qistate { segments = Segments1,
dirty_count = 1 }),
- {Count, PRef, TRef, Terms, State3}.
+ {Count, Terms, State3}.
maybe_add_to_journal( true, true, _Del, _RelSeq, Segment) ->
Segment;
@@ -442,34 +431,28 @@ recover(DurableQueues) ->
[]
end,
DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)),
- {DurableQueueNames, TransientDirs, DurableRefs} =
+ {DurableQueueNames, TransientDirs, DurableTerms} =
lists:foldl(
- fun (QueueDir, {DurableAcc, TransientAcc, RefsAcc}) ->
+ fun (QueueDir, {DurableAcc, TransientAcc, TermsAcc}) ->
case sets:is_element(QueueDir, DurableDirectories) of
true ->
- RefsAcc1 =
+ TermsAcc1 =
case read_shutdown_terms(
filename:join(QueuesDir, QueueDir)) of
- {error, _} ->
- RefsAcc;
- {ok, Terms} ->
- case proplists:get_value(
- persistent_ref, Terms) of
- undefined -> RefsAcc;
- Ref -> [Ref | RefsAcc]
- end
+ {error, _} -> TermsAcc;
+ {ok, Terms} -> [Terms | TermsAcc]
end,
{[dict:fetch(QueueDir, DurableDict) | DurableAcc],
- TransientAcc, RefsAcc1};
+ TransientAcc, TermsAcc1};
false ->
- {DurableAcc, [QueueDir | TransientAcc], RefsAcc}
+ {DurableAcc, [QueueDir | TransientAcc], TermsAcc}
end
end, {[], [], []}, Directories),
lists:foreach(fun (DirName) ->
Dir = filename:join(queues_dir(), DirName),
ok = rabbit_misc:recursive_delete([Dir])
end, TransientDirs),
- {DurableRefs, {fun queue_index_walker/1, DurableQueueNames}}.
+ {DurableTerms, {fun queue_index_walker/1, DurableQueueNames}}.
%%----------------------------------------------------------------------------
%% Msg Store Startup Delta Function
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3cac429e4c..c8de79845d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1214,7 +1214,7 @@ test_queue() ->
empty_test_queue() ->
ok = rabbit_variable_queue:start([]),
- {0, _PRef, _TRef, _Terms, Qi1} = test_queue_init(),
+ {0, _Terms, Qi1} = test_queue_init(),
_Qi2 = rabbit_queue_index:terminate_and_erase(Qi1),
ok.
@@ -1270,7 +1270,7 @@ test_queue_index() ->
ok = empty_test_queue(),
SeqIdsA = lists:seq(0,9999),
SeqIdsB = lists:seq(10000,19999),
- {0, _PRef, _TRef, _Terms, Qi0} = test_queue_init(),
+ {0, _Terms, Qi0} = test_queue_init(),
{0, 0, Qi1} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0),
{Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1),
@@ -1284,7 +1284,7 @@ test_queue_index() ->
ok = stop_msg_store(),
ok = rabbit_variable_queue:start([test_queue()]),
%% should get length back as 0, as all the msgs were transient
- {0, _PRef1, _TRef1, _Terms1, Qi6} = test_queue_init(),
+ {0, _Terms1, Qi6} = test_queue_init(),
{0, 0, Qi7} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6),
{Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7),
@@ -1298,7 +1298,7 @@ test_queue_index() ->
ok = rabbit_variable_queue:start([test_queue()]),
%% should get length back as 10000
LenB = length(SeqIdsB),
- {LenB, _PRef2, _TRef2, _Terms2, Qi12} = test_queue_init(),
+ {LenB, _Terms2, Qi12} = test_queue_init(),
{0, TwoSegs, Qi13} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi12),
Qi14 = queue_index_deliver(SeqIdsB, Qi13),
@@ -1314,7 +1314,7 @@ test_queue_index() ->
ok = stop_msg_store(),
ok = rabbit_variable_queue:start([test_queue()]),
%% should get length back as 0 because all persistent msgs have been acked
- {0, _PRef3, _TRef3, _Terms3, Qi20} = test_queue_init(),
+ {0, _Terms3, Qi20} = test_queue_init(),
_Qi21 = rabbit_queue_index:terminate_and_erase(Qi20),
ok = stop_msg_store(),
ok = empty_test_queue(),
@@ -1323,7 +1323,7 @@ test_queue_index() ->
%% First, partials:
%% a) partial pub+del+ack, then move to new segment
SeqIdsC = lists:seq(0,trunc(SegmentSize/2)),
- {0, _PRef4, _TRef4, _Terms4, Qi22} = test_queue_init(),
+ {0, _Terms4, Qi22} = test_queue_init(),
{Qi23, _SeqIdsGuidsC} = queue_index_publish(SeqIdsC, false, Qi22),
Qi24 = queue_index_deliver(SeqIdsC, Qi23),
Qi25 = rabbit_queue_index:write_acks(SeqIdsC, Qi24),
@@ -1334,7 +1334,7 @@ test_queue_index() ->
ok = empty_test_queue(),
%% b) partial pub+del, then move to new segment, then ack all in old segment
- {0, _PRef5, _TRef5, _Terms5, Qi29} = test_queue_init(),
+ {0, _Terms5, Qi29} = test_queue_init(),
{Qi30, _SeqIdsGuidsC2} = queue_index_publish(SeqIdsC, false, Qi29),
Qi31 = queue_index_deliver(SeqIdsC, Qi30),
{Qi32, _SeqIdsGuidsC3} = queue_index_publish([SegmentSize], false, Qi31),
@@ -1346,7 +1346,7 @@ test_queue_index() ->
%% c) just fill up several segments of all pubs, then +dels, then +acks
SeqIdsD = lists:seq(0,SegmentSize*4),
- {0, _PRef6, _TRef6, _Terms6, Qi36} = test_queue_init(),
+ {0, _Terms6, Qi36} = test_queue_init(),
{Qi37, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD, false, Qi36),
Qi38 = queue_index_deliver(SeqIdsD, Qi37),
Qi39 = rabbit_queue_index:write_acks(SeqIdsD, Qi38),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 18b3847d89..39ef3ec421 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -257,7 +257,12 @@
start(DurableQueues) ->
ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()),
- {Refs, StartFunState} = rabbit_queue_index:recover(DurableQueues),
+ {AllTerms, StartFunState} = rabbit_queue_index:recover(DurableQueues),
+ Refs = [Ref || Terms <- AllTerms,
+ begin
+ Ref = proplists:get_value(persistent_ref, Terms),
+ Ref =/= undefined
+ end],
ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
[?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
undefined, {fun (ok) -> finished end, ok}]),
@@ -276,12 +281,19 @@ init(QueueName, IsDurable, _Recover) ->
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
end,
- {DeltaCount, PRef, TRef, Terms, IndexState} =
+ {DeltaCount, Terms, IndexState} =
rabbit_queue_index:init(QueueName, MsgStoreRecovered, ContainsCheckFun),
{DeltaSeqId, NextSeqId, IndexState1} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState),
- DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
+ {PRef, TRef, Terms1} =
+ case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
+ [] -> {proplists:get_value(persistent_ref, Terms),
+ proplists:get_value(transient_ref, Terms),
+ Terms};
+ _ -> {rabbit_guid:guid(), rabbit_guid:guid(), []}
+ end,
+ DeltaCount1 = proplists:get_value(persistent_count, Terms1, DeltaCount),
Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
true -> ?BLANK_DELTA;
false -> #delta { start_seq_id = DeltaSeqId,
@@ -329,7 +341,8 @@ terminate(State) ->
remove_pending_ack(true, tx_commit_index(State)),
rabbit_msg_store:client_terminate(MSCStateP),
rabbit_msg_store:client_terminate(MSCStateT),
- Terms = [{persistent_ref, PRef}, {transient_ref, TRef},
+ Terms = [{persistent_ref, PRef},
+ {transient_ref, TRef},
{persistent_count, PCount}],
State1 #vqstate { index_state = rabbit_queue_index:terminate(
Terms, IndexState),