diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-05-05 01:08:41 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-05-05 01:08:41 +0100 |
| commit | df3c84837b7021031f1c99acbc70f1e36d3fa70f (patch) | |
| tree | 097877dd3c45d0092e78f81399d349295f875dfc | |
| parent | 04178c056f9c666f31c5f42cf94d76d6191299eb (diff) | |
| download | rabbitmq-server-git-df3c84837b7021031f1c99acbc70f1e36d3fa70f.tar.gz | |
made shutdown term structure opaque to qi
only the vq needs to know about it
| -rw-r--r-- | src/rabbit_queue_index.erl | 47 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 21 |
3 files changed, 40 insertions, 44 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index aee295ae3b..95df893883 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -196,8 +196,7 @@ {(fun ((A) -> 'finished' | {guid(), non_neg_integer(), A})), A}). -spec(init/3 :: (queue_name(), boolean(), fun ((guid()) -> boolean())) -> - {'undefined' | - non_neg_integer(), binary(), binary(), [any()], qistate()}). + {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(terminate_and_erase/1 :: (qistate()) -> qistate()). -spec(write_published/4 :: (guid(), seq_id(), boolean(), qistate()) @@ -212,7 +211,7 @@ -spec(segment_size/0 :: () -> non_neg_integer()). -spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). --spec(recover/1 :: ([queue_name()]) -> {[binary()], startup_fun_state()}). +-spec(recover/1 :: ([queue_name()]) -> {[[any()]], startup_fun_state()}). -endif. @@ -223,20 +222,10 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) -> State = blank_state(Name), - {PRef, TRef, Terms} = - case read_shutdown_terms(State #qistate.dir) of - {error, _} -> - {rabbit_guid:guid(), rabbit_guid:guid(), []}; - {ok, Terms1} -> - case [persistent_ref, transient_ref] -- - proplists:get_keys(Terms1) of - [] -> - {proplists:get_value(persistent_ref, Terms1), - proplists:get_value(transient_ref, Terms1), Terms1}; - _ -> - {rabbit_guid:guid(), rabbit_guid:guid(), []} - end - end, + Terms = case read_shutdown_terms(State #qistate.dir) of + {error, _} -> []; + {ok, Terms1} -> Terms1 + end, %% 1. Load the journal completely. This will also load segments %% which have entries in the journal and remove duplicates. %% The counts will correctly reflect the combination of the @@ -305,7 +294,7 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) -> %% artificially set the dirty_count non zero and call flush again State3 = flush_journal(State2 #qistate { segments = Segments1, dirty_count = 1 }), - {Count, PRef, TRef, Terms, State3}. + {Count, Terms, State3}. maybe_add_to_journal( true, true, _Del, _RelSeq, Segment) -> Segment; @@ -442,34 +431,28 @@ recover(DurableQueues) -> [] end, DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)), - {DurableQueueNames, TransientDirs, DurableRefs} = + {DurableQueueNames, TransientDirs, DurableTerms} = lists:foldl( - fun (QueueDir, {DurableAcc, TransientAcc, RefsAcc}) -> + fun (QueueDir, {DurableAcc, TransientAcc, TermsAcc}) -> case sets:is_element(QueueDir, DurableDirectories) of true -> - RefsAcc1 = + TermsAcc1 = case read_shutdown_terms( filename:join(QueuesDir, QueueDir)) of - {error, _} -> - RefsAcc; - {ok, Terms} -> - case proplists:get_value( - persistent_ref, Terms) of - undefined -> RefsAcc; - Ref -> [Ref | RefsAcc] - end + {error, _} -> TermsAcc; + {ok, Terms} -> [Terms | TermsAcc] end, {[dict:fetch(QueueDir, DurableDict) | DurableAcc], - TransientAcc, RefsAcc1}; + TransientAcc, TermsAcc1}; false -> - {DurableAcc, [QueueDir | TransientAcc], RefsAcc} + {DurableAcc, [QueueDir | TransientAcc], TermsAcc} end end, {[], [], []}, Directories), lists:foreach(fun (DirName) -> Dir = filename:join(queues_dir(), DirName), ok = rabbit_misc:recursive_delete([Dir]) end, TransientDirs), - {DurableRefs, {fun queue_index_walker/1, DurableQueueNames}}. + {DurableTerms, {fun queue_index_walker/1, DurableQueueNames}}. %%---------------------------------------------------------------------------- %% Msg Store Startup Delta Function diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3cac429e4c..c8de79845d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1214,7 +1214,7 @@ test_queue() -> empty_test_queue() -> ok = rabbit_variable_queue:start([]), - {0, _PRef, _TRef, _Terms, Qi1} = test_queue_init(), + {0, _Terms, Qi1} = test_queue_init(), _Qi2 = rabbit_queue_index:terminate_and_erase(Qi1), ok. @@ -1270,7 +1270,7 @@ test_queue_index() -> ok = empty_test_queue(), SeqIdsA = lists:seq(0,9999), SeqIdsB = lists:seq(10000,19999), - {0, _PRef, _TRef, _Terms, Qi0} = test_queue_init(), + {0, _Terms, Qi0} = test_queue_init(), {0, 0, Qi1} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0), {Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1), @@ -1284,7 +1284,7 @@ test_queue_index() -> ok = stop_msg_store(), ok = rabbit_variable_queue:start([test_queue()]), %% should get length back as 0, as all the msgs were transient - {0, _PRef1, _TRef1, _Terms1, Qi6} = test_queue_init(), + {0, _Terms1, Qi6} = test_queue_init(), {0, 0, Qi7} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6), {Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7), @@ -1298,7 +1298,7 @@ test_queue_index() -> ok = rabbit_variable_queue:start([test_queue()]), %% should get length back as 10000 LenB = length(SeqIdsB), - {LenB, _PRef2, _TRef2, _Terms2, Qi12} = test_queue_init(), + {LenB, _Terms2, Qi12} = test_queue_init(), {0, TwoSegs, Qi13} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi12), Qi14 = queue_index_deliver(SeqIdsB, Qi13), @@ -1314,7 +1314,7 @@ test_queue_index() -> ok = stop_msg_store(), ok = rabbit_variable_queue:start([test_queue()]), %% should get length back as 0 because all persistent msgs have been acked - {0, _PRef3, _TRef3, _Terms3, Qi20} = test_queue_init(), + {0, _Terms3, Qi20} = test_queue_init(), _Qi21 = rabbit_queue_index:terminate_and_erase(Qi20), ok = stop_msg_store(), ok = empty_test_queue(), @@ -1323,7 +1323,7 @@ test_queue_index() -> %% First, partials: %% a) partial pub+del+ack, then move to new segment SeqIdsC = lists:seq(0,trunc(SegmentSize/2)), - {0, _PRef4, _TRef4, _Terms4, Qi22} = test_queue_init(), + {0, _Terms4, Qi22} = test_queue_init(), {Qi23, _SeqIdsGuidsC} = queue_index_publish(SeqIdsC, false, Qi22), Qi24 = queue_index_deliver(SeqIdsC, Qi23), Qi25 = rabbit_queue_index:write_acks(SeqIdsC, Qi24), @@ -1334,7 +1334,7 @@ test_queue_index() -> ok = empty_test_queue(), %% b) partial pub+del, then move to new segment, then ack all in old segment - {0, _PRef5, _TRef5, _Terms5, Qi29} = test_queue_init(), + {0, _Terms5, Qi29} = test_queue_init(), {Qi30, _SeqIdsGuidsC2} = queue_index_publish(SeqIdsC, false, Qi29), Qi31 = queue_index_deliver(SeqIdsC, Qi30), {Qi32, _SeqIdsGuidsC3} = queue_index_publish([SegmentSize], false, Qi31), @@ -1346,7 +1346,7 @@ test_queue_index() -> %% c) just fill up several segments of all pubs, then +dels, then +acks SeqIdsD = lists:seq(0,SegmentSize*4), - {0, _PRef6, _TRef6, _Terms6, Qi36} = test_queue_init(), + {0, _Terms6, Qi36} = test_queue_init(), {Qi37, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD, false, Qi36), Qi38 = queue_index_deliver(SeqIdsD, Qi37), Qi39 = rabbit_queue_index:write_acks(SeqIdsD, Qi38), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 18b3847d89..39ef3ec421 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -257,7 +257,12 @@ start(DurableQueues) -> ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()), - {Refs, StartFunState} = rabbit_queue_index:recover(DurableQueues), + {AllTerms, StartFunState} = rabbit_queue_index:recover(DurableQueues), + Refs = [Ref || Terms <- AllTerms, + begin + Ref = proplists:get_value(persistent_ref, Terms), + Ref =/= undefined + end], ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store, [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined, {fun (ok) -> finished end, ok}]), @@ -276,12 +281,19 @@ init(QueueName, IsDurable, _Recover) -> fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end, - {DeltaCount, PRef, TRef, Terms, IndexState} = + {DeltaCount, Terms, IndexState} = rabbit_queue_index:init(QueueName, MsgStoreRecovered, ContainsCheckFun), {DeltaSeqId, NextSeqId, IndexState1} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState), - DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), + {PRef, TRef, Terms1} = + case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of + [] -> {proplists:get_value(persistent_ref, Terms), + proplists:get_value(transient_ref, Terms), + Terms}; + _ -> {rabbit_guid:guid(), rabbit_guid:guid(), []} + end, + DeltaCount1 = proplists:get_value(persistent_count, Terms1, DeltaCount), Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of true -> ?BLANK_DELTA; false -> #delta { start_seq_id = DeltaSeqId, @@ -329,7 +341,8 @@ terminate(State) -> remove_pending_ack(true, tx_commit_index(State)), rabbit_msg_store:client_terminate(MSCStateP), rabbit_msg_store:client_terminate(MSCStateT), - Terms = [{persistent_ref, PRef}, {transient_ref, TRef}, + Terms = [{persistent_ref, PRef}, + {transient_ref, TRef}, {persistent_count, PCount}], State1 #vqstate { index_state = rabbit_queue_index:terminate( Terms, IndexState), |
