diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-12 15:27:36 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-12 15:27:36 +0100 |
| commit | 3de13e6483e3c3ebf3886b0862a22bedb9b58e77 (patch) | |
| tree | df73cc82949a47530668fcc535cd452ed9a85ad3 /src | |
| parent | f57d4ad92f64c59c1d64147db03830de1abba17b (diff) | |
| download | rabbitmq-server-git-3de13e6483e3c3ebf3886b0862a22bedb9b58e77.tar.gz | |
Reworked msg_store startup sequencing so that qi need not know about the msg_store
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 16 |
4 files changed, 56 insertions, 45 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 800b2061dd..4ac4a16edc 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -33,7 +33,7 @@ -behaviour(gen_server2). --export([start_link/5, write/4, read/3, contains/2, remove/2, release/2, +-export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, sync/3, client_init/2, client_terminate/1, delete_client/2, clean/2, successfully_recovered_state/1]). @@ -119,10 +119,9 @@ dedup_cache_ets :: tid(), cur_file_cache_ets :: tid() }). --spec(start_link/5 :: - (atom(), file_path(), [binary()] | 'undefined', - (fun ((A) -> 'finished' | {guid(), non_neg_integer(), A})), A) -> - {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/4 :: + (atom(), file_path(), [binary()] | 'undefined', startup_fun_state()) -> + {'ok', pid()} | 'ignore' | {'error', any()}). -spec(write/4 :: (server(), guid(), msg(), client_msstate()) -> {'ok', client_msstate()}). -spec(read/3 :: (server(), guid(), client_msstate()) -> @@ -302,9 +301,9 @@ %% public API %%---------------------------------------------------------------------------- -start_link(Server, Dir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit) -> +start_link(Server, Dir, ClientRefs, StartupFunState) -> gen_server2:start_link({local, Server}, ?MODULE, - [Server, Dir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit], + [Server, Dir, ClientRefs, StartupFunState], [{timeout, infinity}]). write(Server, Guid, Msg, CState = @@ -498,7 +497,7 @@ close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } = %% gen_server callbacks %%---------------------------------------------------------------------------- -init([Server, BaseDir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> +init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> process_flag(trap_exit, true), ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 2d9b66738f..4a54fda894 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,11 +31,11 @@ -module(rabbit_queue_index). --export([init/2, terminate/2, terminate_and_erase/1, write_published/4, +-export([init/3, terminate/2, terminate_and_erase/1, write_published/4, write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1, read_segment_entries/2, next_segment_boundary/1, segment_size/0, find_lowest_seq_id_seg_and_next_seq_id/1, - start_msg_stores/1]). + prepare_msg_store_seed_funs/1]). -export([queue_index_walker_reader/3]). %% for internal use only @@ -171,8 +171,7 @@ num }). --include("rabbit.hrl"). --include("rabbit_variable_queue.hrl"). +-include("rabbit_msg_store.hrl"). %%---------------------------------------------------------------------------- @@ -195,7 +194,7 @@ dirty_count :: integer() }). --spec(init/2 :: (queue_name(), boolean()) -> +-spec(init/3 :: (queue_name(), boolean(), fun ((guid()) -> boolean())) -> {'undefined' | non_neg_integer(), binary(), binary(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(terminate_and_erase/1 :: (qistate()) -> qistate()). @@ -211,7 +210,10 @@ -spec(segment_size/0 :: () -> non_neg_integer()). -spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). --spec(start_msg_stores/1 :: ([queue_name()]) -> 'ok'). +-spec(prepare_msg_store_seed_funs/1 :: + ([queue_name()]) -> + {{[binary()] | 'undefined', startup_fun_state()}, + {[binary()] | 'undefined', startup_fun_state()}}). -endif. @@ -220,7 +222,7 @@ %% Public API %%---------------------------------------------------------------------------- -init(Name, MsgStoreRecovered) -> +init(Name, MsgStoreRecovered, ContainsCheckFun) -> State = blank_state(Name), {PRef, TRef, Terms} = case read_shutdown_terms(State #qistate.dir) of @@ -269,8 +271,7 @@ init(Name, MsgStoreRecovered) -> Segment3) -> Segment4 = maybe_add_to_journal( - rabbit_msg_store:contains( - ?PERSISTENT_MSG_STORE, Guid), + ContainsCheckFun(Guid), CleanShutdown, Del, RelSeq, Segment3), Segment4 end, Segment1 #segment { pubs = PubCount, @@ -428,12 +429,7 @@ find_lowest_seq_id_seg_and_next_seq_id(State) -> end, {LowSeqIdSeg, NextSeqId, State}. -start_msg_stores(DurableQueues) -> - ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()), - ok = rabbit_sup:start_child( - ?TRANSIENT_MSG_STORE, rabbit_msg_store, - [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined, - fun (ok) -> finished end, ok]), +prepare_msg_store_seed_funs(DurableQueues) -> DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} || Queue <- DurableQueues ]), @@ -470,15 +466,12 @@ start_msg_stores(DurableQueues) -> {DurableAcc, [QueueDir | TransientAcc], RefsAcc} end end, {[], [], []}, Directories), - ok = rabbit_sup:start_child( - ?PERSISTENT_MSG_STORE, rabbit_msg_store, - [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), DurableRefs, - fun queue_index_walker/1, DurableQueueNames]), lists:foreach(fun (DirName) -> Dir = filename:join(queues_dir(), DirName), ok = delete_queue_directory(Dir) end, TransientDirs), - ok. + {{undefined, {fun (ok) -> finished end, ok}}, + {DurableRefs, {fun queue_index_walker/1, DurableQueueNames}}}. %%---------------------------------------------------------------------------- %% Msg Store Startup Delta Function diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b186538ba1..a826cc6220 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -999,7 +999,7 @@ start_msg_store(MsgRefDeltaGen, MsgRefDeltaGenInit) -> ok = rabbit_sup:start_child( ?PERSISTENT_MSG_STORE, rabbit_msg_store, [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), undefined, - MsgRefDeltaGen, MsgRefDeltaGenInit]), + {MsgRefDeltaGen, MsgRefDeltaGenInit}]), start_transient_msg_store(). start_transient_msg_store() -> @@ -1007,7 +1007,7 @@ start_transient_msg_store() -> ok = rabbit_sup:start_child( ?TRANSIENT_MSG_STORE, rabbit_msg_store, [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined, - fun (ok) -> finished end, ok]). + {fun (ok) -> finished end, ok}]). stop_msg_store() -> case supervisor:terminate_child(rabbit_sup, ?PERSISTENT_MSG_STORE) of @@ -1198,8 +1198,8 @@ test_queue() -> queue_name(test). empty_test_queue() -> - ok = rabbit_queue_index:start_msg_stores([]), - {0, _PRef, _TRef, _Terms, Qi1} = rabbit_queue_index:init(test_queue(), false), + ok = rabbit_variable_queue:start([]), + {0, _PRef, _TRef, _Terms, Qi1} = test_queue_init(), _Qi2 = rabbit_queue_index:terminate_and_erase(Qi1), ok. @@ -1241,6 +1241,13 @@ verify_read_with_published(Delivered, Persistent, verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> ko. +test_queue_init() -> + rabbit_queue_index:init( + test_queue(), false, + fun (Guid) -> + rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) + end). + test_queue_index() -> SegmentSize = rabbit_queue_index:segment_size(), TwoSegs = SegmentSize + SegmentSize, @@ -1248,7 +1255,7 @@ test_queue_index() -> ok = empty_test_queue(), SeqIdsA = lists:seq(0,9999), SeqIdsB = lists:seq(10000,19999), - {0, _PRef, _TRef, _Terms, Qi0} = rabbit_queue_index:init(test_queue(), false), + {0, _PRef, _TRef, _Terms, Qi0} = test_queue_init(), {0, 0, Qi1} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0), {Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1), @@ -1260,9 +1267,9 @@ test_queue_index() -> %% call terminate twice to prove it's idempotent _Qi5 = rabbit_queue_index:terminate([], rabbit_queue_index:terminate([], Qi4)), ok = stop_msg_store(), - ok = rabbit_queue_index:start_msg_stores([test_queue()]), + ok = rabbit_variable_queue:start([test_queue()]), %% should get length back as 0, as all the msgs were transient - {0, _PRef1, _TRef1, _Terms1, Qi6} = rabbit_queue_index:init(test_queue(), false), + {0, _PRef1, _TRef1, _Terms1, Qi6} = test_queue_init(), {0, 0, Qi7} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6), {Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7), @@ -1273,10 +1280,10 @@ test_queue_index() -> lists:reverse(SeqIdsGuidsB)), _Qi11 = rabbit_queue_index:terminate([], Qi10), ok = stop_msg_store(), - ok = rabbit_queue_index:start_msg_stores([test_queue()]), + ok = rabbit_variable_queue:start([test_queue()]), %% should get length back as 10000 LenB = length(SeqIdsB), - {LenB, _PRef2, _TRef2, _Terms2, Qi12} = rabbit_queue_index:init(test_queue(), false), + {LenB, _PRef2, _TRef2, _Terms2, Qi12} = test_queue_init(), {0, TwoSegs, Qi13} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi12), Qi14 = queue_index_deliver(SeqIdsB, Qi13), @@ -1290,9 +1297,9 @@ test_queue_index() -> rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17), _Qi19 = rabbit_queue_index:terminate([], Qi18), ok = stop_msg_store(), - ok = rabbit_queue_index:start_msg_stores([test_queue()]), + ok = rabbit_variable_queue:start([test_queue()]), %% should get length back as 0 because all persistent msgs have been acked - {0, _PRef3, _TRef3, _Terms3, Qi20} = rabbit_queue_index:init(test_queue(), false), + {0, _PRef3, _TRef3, _Terms3, Qi20} = test_queue_init(), _Qi21 = rabbit_queue_index:terminate_and_erase(Qi20), ok = stop_msg_store(), ok = empty_test_queue(), @@ -1301,7 +1308,7 @@ test_queue_index() -> %% First, partials: %% a) partial pub+del+ack, then move to new segment SeqIdsC = lists:seq(0,trunc(SegmentSize/2)), - {0, _PRef4, _TRef4, _Terms4, Qi22} = rabbit_queue_index:init(test_queue(), false), + {0, _PRef4, _TRef4, _Terms4, Qi22} = test_queue_init(), {Qi23, _SeqIdsGuidsC} = queue_index_publish(SeqIdsC, false, Qi22), Qi24 = queue_index_deliver(SeqIdsC, Qi23), Qi25 = rabbit_queue_index:write_acks(SeqIdsC, Qi24), @@ -1312,7 +1319,7 @@ test_queue_index() -> ok = empty_test_queue(), %% b) partial pub+del, then move to new segment, then ack all in old segment - {0, _PRef5, _TRef5, _Terms5, Qi29} = rabbit_queue_index:init(test_queue(), false), + {0, _PRef5, _TRef5, _Terms5, Qi29} = test_queue_init(), {Qi30, _SeqIdsGuidsC2} = queue_index_publish(SeqIdsC, false, Qi29), Qi31 = queue_index_deliver(SeqIdsC, Qi30), {Qi32, _SeqIdsGuidsC3} = queue_index_publish([SegmentSize], false, Qi31), @@ -1324,14 +1331,14 @@ test_queue_index() -> %% c) just fill up several segments of all pubs, then +dels, then +acks SeqIdsD = lists:seq(0,SegmentSize*4), - {0, _PRef6, _TRef6, _Terms6, Qi36} = rabbit_queue_index:init(test_queue(), false), + {0, _PRef6, _TRef6, _Terms6, Qi36} = test_queue_init(), {Qi37, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD, false, Qi36), Qi38 = queue_index_deliver(SeqIdsD, Qi37), Qi39 = rabbit_queue_index:write_acks(SeqIdsD, Qi38), Qi40 = queue_index_flush_journal(Qi39), _Qi41 = rabbit_queue_index:terminate_and_erase(Qi40), ok = stop_msg_store(), - ok = rabbit_queue_index:start_msg_stores([]), + ok = rabbit_variable_queue:start([]), ok = stop_msg_store(), passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index bbf78bb7e9..72fba950c7 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -253,7 +253,15 @@ %%---------------------------------------------------------------------------- start(DurableQueues) -> - rabbit_queue_index:start_msg_stores(DurableQueues). + ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()), + {{TransRefs, TransStartFunState}, {PersistRefs, PersistStartFunState}} + = rabbit_queue_index:prepare_msg_store_seed_funs(DurableQueues), + ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store, + [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), + TransRefs, TransStartFunState]), + ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store, + [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), + PersistRefs, PersistStartFunState]). init(QueueName, IsDurable) -> PersistentStore = case IsDurable of @@ -262,8 +270,12 @@ init(QueueName, IsDurable) -> end, MsgStoreRecovered = rabbit_msg_store:successfully_recovered_state(PersistentStore), + ContainsCheckFun = + fun (Guid) -> + rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) + end, {DeltaCount, PRef, TRef, Terms, IndexState} = - rabbit_queue_index:init(QueueName, MsgStoreRecovered), + rabbit_queue_index:init(QueueName, MsgStoreRecovered, ContainsCheckFun), {DeltaSeqId, NextSeqId, IndexState1} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState), |
