summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-12 15:27:36 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-12 15:27:36 +0100
commit3de13e6483e3c3ebf3886b0862a22bedb9b58e77 (patch)
treedf73cc82949a47530668fcc535cd452ed9a85ad3 /src
parentf57d4ad92f64c59c1d64147db03830de1abba17b (diff)
downloadrabbitmq-server-git-3de13e6483e3c3ebf3886b0862a22bedb9b58e77.tar.gz
Reworked msg_store startup sequencing so that qi need not know about the msg_store
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl15
-rw-r--r--src/rabbit_queue_index.erl33
-rw-r--r--src/rabbit_tests.erl37
-rw-r--r--src/rabbit_variable_queue.erl16
4 files changed, 56 insertions, 45 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 800b2061dd..4ac4a16edc 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -33,7 +33,7 @@
-behaviour(gen_server2).
--export([start_link/5, write/4, read/3, contains/2, remove/2, release/2,
+-export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
sync/3, client_init/2, client_terminate/1, delete_client/2, clean/2,
successfully_recovered_state/1]).
@@ -119,10 +119,9 @@
dedup_cache_ets :: tid(),
cur_file_cache_ets :: tid() }).
--spec(start_link/5 ::
- (atom(), file_path(), [binary()] | 'undefined',
- (fun ((A) -> 'finished' | {guid(), non_neg_integer(), A})), A) ->
- {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/4 ::
+ (atom(), file_path(), [binary()] | 'undefined', startup_fun_state()) ->
+ {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(write/4 :: (server(), guid(), msg(), client_msstate()) ->
{'ok', client_msstate()}).
-spec(read/3 :: (server(), guid(), client_msstate()) ->
@@ -302,9 +301,9 @@
%% public API
%%----------------------------------------------------------------------------
-start_link(Server, Dir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit) ->
+start_link(Server, Dir, ClientRefs, StartupFunState) ->
gen_server2:start_link({local, Server}, ?MODULE,
- [Server, Dir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit],
+ [Server, Dir, ClientRefs, StartupFunState],
[{timeout, infinity}]).
write(Server, Guid, Msg, CState =
@@ -498,7 +497,7 @@ close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } =
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([Server, BaseDir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
+init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
process_flag(trap_exit, true),
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 2d9b66738f..4a54fda894 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,11 +31,11 @@
-module(rabbit_queue_index).
--export([init/2, terminate/2, terminate_and_erase/1, write_published/4,
+-export([init/3, terminate/2, terminate_and_erase/1, write_published/4,
write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1,
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
find_lowest_seq_id_seg_and_next_seq_id/1,
- start_msg_stores/1]).
+ prepare_msg_store_seed_funs/1]).
-export([queue_index_walker_reader/3]). %% for internal use only
@@ -171,8 +171,7 @@
num
}).
--include("rabbit.hrl").
--include("rabbit_variable_queue.hrl").
+-include("rabbit_msg_store.hrl").
%%----------------------------------------------------------------------------
@@ -195,7 +194,7 @@
dirty_count :: integer()
}).
--spec(init/2 :: (queue_name(), boolean()) ->
+-spec(init/3 :: (queue_name(), boolean(), fun ((guid()) -> boolean())) ->
{'undefined' | non_neg_integer(), binary(), binary(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(terminate_and_erase/1 :: (qistate()) -> qistate()).
@@ -211,7 +210,10 @@
-spec(segment_size/0 :: () -> non_neg_integer()).
-spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
--spec(start_msg_stores/1 :: ([queue_name()]) -> 'ok').
+-spec(prepare_msg_store_seed_funs/1 ::
+ ([queue_name()]) ->
+ {{[binary()] | 'undefined', startup_fun_state()},
+ {[binary()] | 'undefined', startup_fun_state()}}).
-endif.
@@ -220,7 +222,7 @@
%% Public API
%%----------------------------------------------------------------------------
-init(Name, MsgStoreRecovered) ->
+init(Name, MsgStoreRecovered, ContainsCheckFun) ->
State = blank_state(Name),
{PRef, TRef, Terms} =
case read_shutdown_terms(State #qistate.dir) of
@@ -269,8 +271,7 @@ init(Name, MsgStoreRecovered) ->
Segment3) ->
Segment4 =
maybe_add_to_journal(
- rabbit_msg_store:contains(
- ?PERSISTENT_MSG_STORE, Guid),
+ ContainsCheckFun(Guid),
CleanShutdown, Del, RelSeq, Segment3),
Segment4
end, Segment1 #segment { pubs = PubCount,
@@ -428,12 +429,7 @@ find_lowest_seq_id_seg_and_next_seq_id(State) ->
end,
{LowSeqIdSeg, NextSeqId, State}.
-start_msg_stores(DurableQueues) ->
- ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()),
- ok = rabbit_sup:start_child(
- ?TRANSIENT_MSG_STORE, rabbit_msg_store,
- [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined,
- fun (ok) -> finished end, ok]),
+prepare_msg_store_seed_funs(DurableQueues) ->
DurableDict =
dict:from_list([ {queue_name_to_dir_name(Queue), Queue} ||
Queue <- DurableQueues ]),
@@ -470,15 +466,12 @@ start_msg_stores(DurableQueues) ->
{DurableAcc, [QueueDir | TransientAcc], RefsAcc}
end
end, {[], [], []}, Directories),
- ok = rabbit_sup:start_child(
- ?PERSISTENT_MSG_STORE, rabbit_msg_store,
- [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), DurableRefs,
- fun queue_index_walker/1, DurableQueueNames]),
lists:foreach(fun (DirName) ->
Dir = filename:join(queues_dir(), DirName),
ok = delete_queue_directory(Dir)
end, TransientDirs),
- ok.
+ {{undefined, {fun (ok) -> finished end, ok}},
+ {DurableRefs, {fun queue_index_walker/1, DurableQueueNames}}}.
%%----------------------------------------------------------------------------
%% Msg Store Startup Delta Function
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b186538ba1..a826cc6220 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -999,7 +999,7 @@ start_msg_store(MsgRefDeltaGen, MsgRefDeltaGenInit) ->
ok = rabbit_sup:start_child(
?PERSISTENT_MSG_STORE, rabbit_msg_store,
[?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), undefined,
- MsgRefDeltaGen, MsgRefDeltaGenInit]),
+ {MsgRefDeltaGen, MsgRefDeltaGenInit}]),
start_transient_msg_store().
start_transient_msg_store() ->
@@ -1007,7 +1007,7 @@ start_transient_msg_store() ->
ok = rabbit_sup:start_child(
?TRANSIENT_MSG_STORE, rabbit_msg_store,
[?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined,
- fun (ok) -> finished end, ok]).
+ {fun (ok) -> finished end, ok}]).
stop_msg_store() ->
case supervisor:terminate_child(rabbit_sup, ?PERSISTENT_MSG_STORE) of
@@ -1198,8 +1198,8 @@ test_queue() ->
queue_name(test).
empty_test_queue() ->
- ok = rabbit_queue_index:start_msg_stores([]),
- {0, _PRef, _TRef, _Terms, Qi1} = rabbit_queue_index:init(test_queue(), false),
+ ok = rabbit_variable_queue:start([]),
+ {0, _PRef, _TRef, _Terms, Qi1} = test_queue_init(),
_Qi2 = rabbit_queue_index:terminate_and_erase(Qi1),
ok.
@@ -1241,6 +1241,13 @@ verify_read_with_published(Delivered, Persistent,
verify_read_with_published(_Delivered, _Persistent, _Read, _Published) ->
ko.
+test_queue_init() ->
+ rabbit_queue_index:init(
+ test_queue(), false,
+ fun (Guid) ->
+ rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
+ end).
+
test_queue_index() ->
SegmentSize = rabbit_queue_index:segment_size(),
TwoSegs = SegmentSize + SegmentSize,
@@ -1248,7 +1255,7 @@ test_queue_index() ->
ok = empty_test_queue(),
SeqIdsA = lists:seq(0,9999),
SeqIdsB = lists:seq(10000,19999),
- {0, _PRef, _TRef, _Terms, Qi0} = rabbit_queue_index:init(test_queue(), false),
+ {0, _PRef, _TRef, _Terms, Qi0} = test_queue_init(),
{0, 0, Qi1} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0),
{Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1),
@@ -1260,9 +1267,9 @@ test_queue_index() ->
%% call terminate twice to prove it's idempotent
_Qi5 = rabbit_queue_index:terminate([], rabbit_queue_index:terminate([], Qi4)),
ok = stop_msg_store(),
- ok = rabbit_queue_index:start_msg_stores([test_queue()]),
+ ok = rabbit_variable_queue:start([test_queue()]),
%% should get length back as 0, as all the msgs were transient
- {0, _PRef1, _TRef1, _Terms1, Qi6} = rabbit_queue_index:init(test_queue(), false),
+ {0, _PRef1, _TRef1, _Terms1, Qi6} = test_queue_init(),
{0, 0, Qi7} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6),
{Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7),
@@ -1273,10 +1280,10 @@ test_queue_index() ->
lists:reverse(SeqIdsGuidsB)),
_Qi11 = rabbit_queue_index:terminate([], Qi10),
ok = stop_msg_store(),
- ok = rabbit_queue_index:start_msg_stores([test_queue()]),
+ ok = rabbit_variable_queue:start([test_queue()]),
%% should get length back as 10000
LenB = length(SeqIdsB),
- {LenB, _PRef2, _TRef2, _Terms2, Qi12} = rabbit_queue_index:init(test_queue(), false),
+ {LenB, _PRef2, _TRef2, _Terms2, Qi12} = test_queue_init(),
{0, TwoSegs, Qi13} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi12),
Qi14 = queue_index_deliver(SeqIdsB, Qi13),
@@ -1290,9 +1297,9 @@ test_queue_index() ->
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17),
_Qi19 = rabbit_queue_index:terminate([], Qi18),
ok = stop_msg_store(),
- ok = rabbit_queue_index:start_msg_stores([test_queue()]),
+ ok = rabbit_variable_queue:start([test_queue()]),
%% should get length back as 0 because all persistent msgs have been acked
- {0, _PRef3, _TRef3, _Terms3, Qi20} = rabbit_queue_index:init(test_queue(), false),
+ {0, _PRef3, _TRef3, _Terms3, Qi20} = test_queue_init(),
_Qi21 = rabbit_queue_index:terminate_and_erase(Qi20),
ok = stop_msg_store(),
ok = empty_test_queue(),
@@ -1301,7 +1308,7 @@ test_queue_index() ->
%% First, partials:
%% a) partial pub+del+ack, then move to new segment
SeqIdsC = lists:seq(0,trunc(SegmentSize/2)),
- {0, _PRef4, _TRef4, _Terms4, Qi22} = rabbit_queue_index:init(test_queue(), false),
+ {0, _PRef4, _TRef4, _Terms4, Qi22} = test_queue_init(),
{Qi23, _SeqIdsGuidsC} = queue_index_publish(SeqIdsC, false, Qi22),
Qi24 = queue_index_deliver(SeqIdsC, Qi23),
Qi25 = rabbit_queue_index:write_acks(SeqIdsC, Qi24),
@@ -1312,7 +1319,7 @@ test_queue_index() ->
ok = empty_test_queue(),
%% b) partial pub+del, then move to new segment, then ack all in old segment
- {0, _PRef5, _TRef5, _Terms5, Qi29} = rabbit_queue_index:init(test_queue(), false),
+ {0, _PRef5, _TRef5, _Terms5, Qi29} = test_queue_init(),
{Qi30, _SeqIdsGuidsC2} = queue_index_publish(SeqIdsC, false, Qi29),
Qi31 = queue_index_deliver(SeqIdsC, Qi30),
{Qi32, _SeqIdsGuidsC3} = queue_index_publish([SegmentSize], false, Qi31),
@@ -1324,14 +1331,14 @@ test_queue_index() ->
%% c) just fill up several segments of all pubs, then +dels, then +acks
SeqIdsD = lists:seq(0,SegmentSize*4),
- {0, _PRef6, _TRef6, _Terms6, Qi36} = rabbit_queue_index:init(test_queue(), false),
+ {0, _PRef6, _TRef6, _Terms6, Qi36} = test_queue_init(),
{Qi37, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD, false, Qi36),
Qi38 = queue_index_deliver(SeqIdsD, Qi37),
Qi39 = rabbit_queue_index:write_acks(SeqIdsD, Qi38),
Qi40 = queue_index_flush_journal(Qi39),
_Qi41 = rabbit_queue_index:terminate_and_erase(Qi40),
ok = stop_msg_store(),
- ok = rabbit_queue_index:start_msg_stores([]),
+ ok = rabbit_variable_queue:start([]),
ok = stop_msg_store(),
passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index bbf78bb7e9..72fba950c7 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -253,7 +253,15 @@
%%----------------------------------------------------------------------------
start(DurableQueues) ->
- rabbit_queue_index:start_msg_stores(DurableQueues).
+ ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()),
+ {{TransRefs, TransStartFunState}, {PersistRefs, PersistStartFunState}}
+ = rabbit_queue_index:prepare_msg_store_seed_funs(DurableQueues),
+ ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
+ [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
+ TransRefs, TransStartFunState]),
+ ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store,
+ [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
+ PersistRefs, PersistStartFunState]).
init(QueueName, IsDurable) ->
PersistentStore = case IsDurable of
@@ -262,8 +270,12 @@ init(QueueName, IsDurable) ->
end,
MsgStoreRecovered =
rabbit_msg_store:successfully_recovered_state(PersistentStore),
+ ContainsCheckFun =
+ fun (Guid) ->
+ rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
+ end,
{DeltaCount, PRef, TRef, Terms, IndexState} =
- rabbit_queue_index:init(QueueName, MsgStoreRecovered),
+ rabbit_queue_index:init(QueueName, MsgStoreRecovered, ContainsCheckFun),
{DeltaSeqId, NextSeqId, IndexState1} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState),