diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-05 16:36:22 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-05 16:36:22 +0100 |
| commit | 85812ceafa7ecf102c5a05b976a04140624caa19 (patch) | |
| tree | f721af45d65724d3bd5c3266de024be913479176 | |
| parent | 7c10eef2999807f2a20837e0388a44d15dfcdce6 (diff) | |
| download | rabbitmq-server-git-85812ceafa7ecf102c5a05b976a04140624caa19.tar.gz | |
The msg_store now avoids building the index and scanning files iff it is shutdown cleanly, and all the clients that it previously knew about were also shutdown cleanly and found on startup.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 149 | ||||
| -rw-r--r-- | src/rabbit_msg_store_ets_index.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 78 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 55 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 34 |
6 files changed, 237 insertions, 94 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c14a28fe11..d23cbd1923 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -129,9 +129,10 @@ start() -> ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()), - ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store, - [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), - fun (ok) -> finished end, ok]), + ok = rabbit_sup:start_child( + ?TRANSIENT_MSG_STORE, rabbit_msg_store, + [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined, + fun (ok) -> finished end, ok]), DurableQueues = find_durable_queues(), ok = rabbit_queue_index:start_persistent_msg_store(DurableQueues), {ok,_} = supervisor:start_child( diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 3b3df7208e..418b5d5864 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -33,8 +33,9 @@ -behaviour(gen_server2). --export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, - sync/3, client_init/1, client_terminate/1, clean/2]). +-export([start_link/5, write/4, read/3, contains/2, remove/2, release/2, + sync/3, client_init/2, client_terminate/1, delete_client/2, clean/2, + successfully_recovered_state/1]). -export([sync/1, gc_done/4, set_maximum_since_use/2, build_index_worker/6]). %% internal @@ -42,9 +43,10 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1]). --define(SYNC_INTERVAL, 5). %% milliseconds - --define(GEOMETRIC_P, 0.3). %% parameter to geometric distribution rng +-define(SYNC_INTERVAL, 5). %% milliseconds +-define(GEOMETRIC_P, 0.3). %% parameter to geometric distribution rng +-define(CLEAN_FILENAME, "clean.dot"). +-define(FILE_SUMMARY_FILENAME, "file_summary.ets"). %%---------------------------------------------------------------------------- @@ -66,7 +68,9 @@ file_handles_ets, %% tid of the shared file handles table file_summary_ets, %% tid of the file summary table dedup_cache_ets, %% tid of dedup cache table - cur_file_cache_ets %% tid of current file cache table + cur_file_cache_ets, %% tid of current file cache table + client_refs, %% set of references of all registered clients + recovered_state %% boolean: did we recover state? }). -record(client_msstate, @@ -98,8 +102,8 @@ dedup_cache_ets :: tid(), cur_file_cache_ets :: tid() }). --spec(start_link/4 :: - (atom(), file_path(), +-spec(start_link/5 :: + (atom(), file_path(), [binary()] | 'undefined', (fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})), A) -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(write/4 :: (server(), msg_id(), msg(), client_msstate()) -> @@ -112,9 +116,11 @@ -spec(sync/3 :: (server(), [msg_id()], fun (() -> any())) -> 'ok'). -spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). --spec(client_init/1 :: (server()) -> client_msstate()). +-spec(client_init/2 :: (server(), binary()) -> client_msstate()). -spec(client_terminate/1 :: (client_msstate()) -> 'ok'). +-spec(delete_client/2 :: (server(), binary()) -> 'ok'). -spec(clean/2 :: (atom(), file_path()) -> 'ok'). +-spec(successfully_recovered_state/1 :: (server()) -> boolean()). -endif. @@ -278,9 +284,9 @@ %% public API %%---------------------------------------------------------------------------- -start_link(Server, Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) -> +start_link(Server, Dir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit) -> gen_server2:start_link({local, Server}, ?MODULE, - [Server, Dir, MsgRefDeltaGen, MsgRefDeltaGenInit], + [Server, Dir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit], [{timeout, infinity}]). write(Server, MsgId, Msg, CState = @@ -326,9 +332,10 @@ gc_done(Server, Reclaimed, Source, Destination) -> set_maximum_since_use(Server, Age) -> gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}). -client_init(Server) -> +client_init(Server, Ref) -> {IState, IModule, Dir, FileHandlesEts, FileSummaryEts, DedupCacheEts, - CurFileCacheEts} = gen_server2:call(Server, new_client_state, infinity), + CurFileCacheEts} = gen_server2:call(Server, {new_client_state, Ref}, + infinity), #client_msstate { file_handle_cache = dict:new(), index_state = IState, index_module = IModule, @@ -342,6 +349,12 @@ client_terminate(CState) -> close_all_handles(CState), ok. +delete_client(Server, Ref) -> + ok = gen_server2:call(Server, {delete_client, Ref}, infinity). + +successfully_recovered_state(Server) -> + gen_server2:call(Server, successfully_recovered_state, infinity). + clean(Server, BaseDir) -> Dir = filename:join(BaseDir, atom_to_list(Server)), ok = rabbit_misc:recursive_delete(Dir). @@ -467,7 +480,7 @@ close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } = %% gen_server callbacks %%---------------------------------------------------------------------------- -init([Server, BaseDir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> +init([Server, BaseDir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), @@ -477,12 +490,32 @@ init([Server, BaseDir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> {ok, IndexModule} = application:get_env(msg_store_index_module), rabbit_log:info("Using ~p to provide index for message store~n", [IndexModule]), - {fresh, IndexState} = IndexModule:init(fresh, Dir), + + {Recovered, IndexState, ClientRefs1} = + case detect_clean_shutdown(Dir) of + {false, _Error} -> + {fresh, IndexState1} = IndexModule:init(fresh, Dir), + {false, IndexState1, sets:new()}; + {true, Terms} -> + case undefined /= ClientRefs andalso lists:sort(ClientRefs) == + lists:sort(proplists:get_value(client_refs, Terms, [])) + andalso proplists:get_value(index_module, Terms) == + IndexModule of + true -> + case IndexModule:init(recover, Dir) of + {fresh, IndexState1} -> + {false, IndexState1, sets:new()}; + {recovered, IndexState1} -> + {true, IndexState1, sets:from_list(ClientRefs)} + end; + false -> + {fresh, IndexState1} = IndexModule:init(fresh, Dir), + {false, IndexState1, sets:new()} + end + end, InitFile = 0, - FileSummaryEts = ets:new(rabbit_msg_store_file_summary, - [ordered_set, public, - {keypos, #file_summary.file}]), + {Recovered1, FileSummaryEts} = recover_file_summary(Recovered, Dir), DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]), FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles, [ordered_set, public]), @@ -504,20 +537,23 @@ init([Server, BaseDir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts + cur_file_cache_ets = CurFileCacheEts, + client_refs = ClientRefs1, + recovered_state = Recovered }, - ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State), + ok = count_msg_refs(Recovered, MsgRefDeltaGen, MsgRefDeltaGenInit, State), FileNames = sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)), TmpFileNames = sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)), ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames), + %% There should be no more tmp files now, so go ahead and load the %% whole lot Files = [filename_to_num(FileName) || FileName <- FileNames], {Offset, State1 = #msstate { current_file = CurFile }} = - build_index(Files, State), + build_index(Recovered1, Files, State), %% read is only needed so that we can seek {ok, FileHdl} = rabbit_msg_store_misc:open_file( @@ -543,15 +579,25 @@ handle_call({contains, MsgId}, From, State) -> State1 = contains_message(MsgId, From, State), noreply(State1); -handle_call(new_client_state, _From, +handle_call({new_client_state, CRef}, _From, State = #msstate { index_state = IndexState, dir = Dir, index_module = IndexModule, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts }) -> + cur_file_cache_ets = CurFileCacheEts, + client_refs = ClientRefs }) -> reply({IndexState, IndexModule, Dir, FileHandlesEts, FileSummaryEts, - DedupCacheEts, CurFileCacheEts}, State). + DedupCacheEts, CurFileCacheEts}, + State #msstate { client_refs = sets:add_element(CRef, ClientRefs) }); + +handle_call(successfully_recovered_state, _From, State) -> + reply(State #msstate.recovered_state, State); + +handle_call({delete_client, CRef}, _From, + State = #msstate { client_refs = ClientRefs }) -> + reply(ok, + State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). handle_cast({write, MsgId, Msg}, State = #msstate { current_file_handle = CurHdl, @@ -680,7 +726,9 @@ terminate(_Reason, State = #msstate { index_state = IndexState, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts }) -> + cur_file_cache_ets = CurFileCacheEts, + client_refs = ClientRefs, + dir = Dir }) -> %% stop the gc first, otherwise it could be working and we pull %% out the ets tables from under it. ok = rabbit_msg_store_gc:stop(GCPid), @@ -691,11 +739,13 @@ terminate(_Reason, State = #msstate { index_state = IndexState, State2 end, State3 = close_all_handles(State1), - ets:delete(FileSummaryEts), + store_file_summary(FileSummaryEts, Dir), ets:delete(DedupCacheEts), ets:delete(FileHandlesEts), ets:delete(CurFileCacheEts), IndexModule:terminate(IndexState), + store_clean_shutdown([{client_refs, sets:to_list(ClientRefs)}, + {index_module, IndexModule}], Dir), State3 #msstate { index_state = undefined, current_file_handle = undefined }. @@ -957,6 +1007,35 @@ get_read_handle(FileNum, FHC, Dir) -> {Hdl, dict:store(FileNum, Hdl, FHC) } end. +detect_clean_shutdown(Dir) -> + Path = filename:join(Dir, ?CLEAN_FILENAME), + case rabbit_misc:read_term_file(Path) of + {ok, Terms} -> case file:delete(Path) of + ok -> {true, Terms}; + {error, Error} -> {false, Error} + end; + {error, Error} -> {false, Error} + end. + +store_clean_shutdown(Terms, Dir) -> + rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms). + +recover_file_summary(false, _Dir) -> + {false, ets:new(rabbit_msg_store_file_summary, + [ordered_set, public, {keypos, #file_summary.file}])}; +recover_file_summary(true, Dir) -> + Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME), + case ets:file2tab(Path) of + {ok, Tid} -> file:delete(Path), + {true, Tid}; + {error, _} -> recover_file_summary(false, Dir) + end. + +store_file_summary(Tid, Dir) -> + ok = ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME), + [{extended_info, [object_count]}]), + ets:delete(Tid). + %%---------------------------------------------------------------------------- %% message cache helper functions %%---------------------------------------------------------------------------- @@ -1034,6 +1113,11 @@ index_delete_by_file(File, #msstate { index_module = Index, %% recovery %%---------------------------------------------------------------------------- +count_msg_refs(false, Gen, Seed, State) -> + count_msg_refs(Gen, Seed, State); +count_msg_refs(true, _Gen, _Seed, _State) -> + ok. + count_msg_refs(Gen, Seed, State) -> case Gen(Seed) of finished -> ok; @@ -1183,7 +1267,18 @@ find_contiguous_block_prefix([{MsgId, TotalSize, ExpectedOffset} | Tail], find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) -> {ExpectedOffset, MsgIds}. -build_index(Files, State) -> +build_index(true, _Files, State = + #msstate { file_summary_ets = FileSummaryEts }) -> + ets:foldl( + fun (#file_summary { valid_total_size = ValidTotalSize, + file_size = FileSize, file = File }, + {_Offset, State1 = #msstate { sum_valid_data = SumValid, + sum_file_size = SumFileSize }}) -> + {FileSize, State1 #msstate { sum_valid_data = SumValid + ValidTotalSize, + sum_file_size = SumFileSize + FileSize, + current_file = File }} + end, {0, State}, FileSummaryEts); +build_index(false, Files, State) -> {ok, Pid} = gatherer:start_link(), case Files of [] -> build_index(Pid, undefined, [State #msstate.current_file], State); diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl index f30934c516..d46212ba15 100644 --- a/src/rabbit_msg_store_ets_index.erl +++ b/src/rabbit_msg_store_ets_index.erl @@ -37,7 +37,7 @@ delete_by_file/2, terminate/1]). -define(MSG_LOC_NAME, rabbit_msg_store_ets_index). --define(FILENAME, msg_store_index.ets). +-define(FILENAME, "msg_store_index.ets"). -include("rabbit_msg_store_index.hrl"). @@ -48,8 +48,10 @@ init(fresh, Dir) -> Tid = ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.msg_id}]), {fresh, #state { table = Tid, dir = Dir }}; init(recover, Dir) -> - case ets:file2tab(filename:join(Dir, ?FILENAME)) of - {ok, Tid} -> {recovered, #state { table = Tid, dir = Dir }}; + Path = filename:join(Dir, ?FILENAME), + case ets:file2tab(Path) of + {ok, Tid} -> file:delete(Path), + {recovered, #state { table = Tid, dir = Dir }}; {error, _} -> init(fresh, Dir) end. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 2a94adf77f..f37d701931 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/1, terminate/1, terminate_and_erase/1, write_published/4, +-export([init/1, terminate/2, terminate_and_erase/1, write_published/4, write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1, read_segment_entries/2, next_segment_boundary/1, segment_size/0, find_lowest_seq_id_seg_and_next_seq_id/1, @@ -195,8 +195,9 @@ dirty_count :: integer() }). --spec(init/1 :: (queue_name()) -> {non_neg_integer(), qistate()}). --spec(terminate/1 :: (qistate()) -> qistate()). +-spec(init/1 :: (queue_name()) -> + {non_neg_integer(), binary(), binary(), qistate()}). +-spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(terminate_and_erase/1 :: (qistate()) -> qistate()). -spec(write_published/4 :: (msg_id(), seq_id(), boolean(), qistate()) -> qistate()). @@ -221,6 +222,19 @@ init(Name) -> State = blank_state(Name), + {PRef, TRef} = case read_shutdown_terms(State #qistate.dir) of + {error, _} -> + {rabbit_guid:guid(), rabbit_guid:guid()}; + {ok, Terms} -> + case [persistent_ref, transient_ref] -- + proplists:get_keys(Terms) of + [] -> + {proplists:get_value(persistent_ref, Terms), + proplists:get_value(transient_ref, Terms)}; + _ -> + {rabbit_guid:guid(), rabbit_guid:guid()} + end + end, %% 1. Load the journal completely. This will also load segments %% which have entries in the journal and remove duplicates. %% The counts will correctly reflect the combination of the @@ -263,7 +277,8 @@ init(Name) -> {segment_store(Segment2, Segments2), CountAcc + PubCount1 - AckCount1, DCountAcc1} end, {Segments, 0, DCount}, AllSegs), - {Count, State2 #qistate { segments = Segments1, dirty_count = DCount1 }}. + {Count, PRef, TRef, + State2 #qistate { segments = Segments1, dirty_count = DCount1 }}. maybe_add_to_journal( true, true, _Del, _RelSeq, Segment) -> {Segment, 0}; @@ -276,11 +291,11 @@ maybe_add_to_journal(false, _, del, RelSeq, Segment) -> maybe_add_to_journal(false, _, _Del, RelSeq, Segment) -> {add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)), 2}. -terminate(State) -> - terminate(true, State). +terminate(Terms, State) -> + terminate(true, Terms, State). terminate_and_erase(State) -> - State1 = terminate(State), + State1 = terminate(false, [], State), ok = delete_queue_directory(State1 #qistate.dir), State1. @@ -397,20 +412,33 @@ start_persistent_msg_store(DurableQueues) -> [] end, DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)), - {DurableQueueNames, TransientDirs} = + {DurableQueueNames, TransientDirs, DurableRefs} = lists:foldl( - fun (QueueDir, {DurableAcc, TransientAcc}) -> + fun (QueueDir, {DurableAcc, TransientAcc, RefsAcc}) -> case sets:is_element(QueueDir, DurableDirectories) of true -> + RefsAcc1 = + case read_shutdown_terms( + filename:join(QueuesDir, QueueDir)) of + {error, _} -> + RefsAcc; + {ok, Terms} -> + case proplists:get_value( + persistent_ref, Terms) of + undefined -> RefsAcc; + Ref -> [Ref | RefsAcc] + end + end, {[dict:fetch(QueueDir, DurableDict) | DurableAcc], - TransientAcc}; + TransientAcc, RefsAcc1}; false -> - {DurableAcc, [QueueDir | TransientAcc]} + {DurableAcc, [QueueDir | TransientAcc], RefsAcc} end - end, {[], []}, Directories), - ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store, - [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), - fun queue_index_walker/1, DurableQueueNames]), + end, {[], [], []}, Directories), + ok = rabbit_sup:start_child( + ?PERSISTENT_MSG_STORE, rabbit_msg_store, + [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), DurableRefs, + fun queue_index_walker/1, DurableQueueNames]), lists:foreach(fun (DirName) -> Dir = filename:join(queues_dir(), DirName), ok = delete_queue_directory(Dir) @@ -444,7 +472,7 @@ queue_index_walker_reader(QueueName, Gatherer, Guid) -> queue_index_walker_reader(Gatherer, Guid, State1, SegNums). queue_index_walker_reader(Gatherer, Guid, State, []) -> - _State = terminate(false, State), + _State = terminate(false, [], State), ok = gatherer:finished(Gatherer, Guid); queue_index_walker_reader(Gatherer, Guid, State, [Seg | SegNums]) -> SeqId = reconstruct_seq_id(Seg, 0), @@ -518,11 +546,11 @@ detect_clean_shutdown(Dir) -> {error, enoent} -> false end. -store_clean_shutdown(Dir) -> - {ok, Hdl} = file_handle_cache:open(filename:join(Dir, ?CLEAN_FILENAME), - [write, raw, binary], - [{write_buffer, unbuffered}]), - ok = file_handle_cache:close(Hdl). +read_shutdown_terms(Dir) -> + rabbit_misc:read_term_file(filename:join(Dir, ?CLEAN_FILENAME)). + +store_clean_shutdown(Terms, Dir) -> + rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms). queue_name_to_dir_name(Name = #resource { kind = queue }) -> Bin = term_to_binary(Name), @@ -646,7 +674,9 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> end, Hdl. -terminate(StoreShutdown, State = +terminate(_StoreShutdown, _Terms, State = #qistate { segments = undefined }) -> + State; +terminate(StoreShutdown, Terms, State = #qistate { journal_handle = JournalHdl, dir = Dir, segments = Segments }) -> ok = case JournalHdl of @@ -660,10 +690,10 @@ terminate(StoreShutdown, State = file_handle_cache:close(Hdl) end, ok, Segments), case StoreShutdown of - true -> store_clean_shutdown(Dir); + true -> store_clean_shutdown(Terms, Dir); false -> ok end, - State #qistate { journal_handle = undefined, segments = segments_new() }. + State #qistate { journal_handle = undefined, segments = undefined }. %%---------------------------------------------------------------------------- %% Majors diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 75c66693e3..22473594a0 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -995,16 +995,18 @@ start_msg_store_empty() -> start_msg_store(fun (ok) -> finished end, ok). start_msg_store(MsgRefDeltaGen, MsgRefDeltaGenInit) -> - ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store, - [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), - MsgRefDeltaGen, MsgRefDeltaGenInit]), + ok = rabbit_sup:start_child( + ?PERSISTENT_MSG_STORE, rabbit_msg_store, + [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), undefined, + MsgRefDeltaGen, MsgRefDeltaGenInit]), start_transient_msg_store(). start_transient_msg_store() -> ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()), - ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store, - [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), - fun (ok) -> finished end, ok]). + ok = rabbit_sup:start_child( + ?TRANSIENT_MSG_STORE, rabbit_msg_store, + [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined, + fun (ok) -> finished end, ok]). stop_msg_store() -> case supervisor:terminate_child(rabbit_sup, ?PERSISTENT_MSG_STORE) of @@ -1061,7 +1063,8 @@ test_msg_store() -> {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds), %% check we don't contain any of the msgs we're about to publish false = msg_store_contains(false, MsgIds), - MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE), + Ref = rabbit_guid:guid(), + MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), %% publish the first half {ok, MSCState1} = msg_store_write(MsgIds1stHalf, MSCState), %% sync on the first half @@ -1135,7 +1138,7 @@ test_msg_store() -> %% check we don't contain any of the msgs false = msg_store_contains(false, MsgIds), %% publish the first half again - MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE), + MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), {ok, MSCState9} = msg_store_write(MsgIds1stHalf, MSCState8), %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( @@ -1154,7 +1157,7 @@ test_msg_store() -> {ok, MSCStateM} = rabbit_msg_store:write(?PERSISTENT_MSG_STORE, MsgId, Payload, MSCStateN), MSCStateM - end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE), MsgIdsBig)), + end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), MsgIdsBig)), %% now read them to ensure we hit the fast client-side reading ok = rabbit_msg_store:client_terminate( lists:foldl( @@ -1162,7 +1165,7 @@ test_msg_store() -> {{ok, Payload}, MSCStateN} = rabbit_msg_store:read(?PERSISTENT_MSG_STORE, MsgId, MSCStateM), MSCStateN - end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE), MsgIdsBig)), + end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), MsgIdsBig)), %% .., then 3s by 1... ok = lists:foldl( fun (MsgId, ok) -> @@ -1203,21 +1206,27 @@ test_amqqueue(Durable) -> empty_test_queue() -> ok = start_transient_msg_store(), ok = rabbit_queue_index:start_persistent_msg_store([]), - {0, Qi1} = rabbit_queue_index:init(test_queue()), + {0, _PRef, _TRef, Qi1} = rabbit_queue_index:init(test_queue()), _Qi2 = rabbit_queue_index:terminate_and_erase(Qi1), ok. queue_index_publish(SeqIds, Persistent, Qi) -> + Ref = rabbit_guid:guid(), + MsgStore = case Persistent of + true -> ?PERSISTENT_MSG_STORE; + false -> ?TRANSIENT_MSG_STORE + end, {A, B, MSCStateEnd} = lists:foldl( fun (SeqId, {QiN, SeqIdsMsgIdsAcc, MSCStateN}) -> MsgId = rabbit_guid:guid(), QiM = rabbit_queue_index:write_published(MsgId, SeqId, Persistent, QiN), - {ok, MSCStateM} = rabbit_msg_store:write(?PERSISTENT_MSG_STORE, MsgId, + {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, MsgId, MsgId, MSCStateN), {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc], MSCStateM} - end, {Qi, [], rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE)}, SeqIds), + end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds), + ok = rabbit_msg_store:delete_client(MsgStore, Ref), ok = rabbit_msg_store:client_terminate(MSCStateEnd), {A, B}. @@ -1246,7 +1255,7 @@ test_queue_index() -> ok = empty_test_queue(), SeqIdsA = lists:seq(0,9999), SeqIdsB = lists:seq(10000,19999), - {0, Qi0} = rabbit_queue_index:init(test_queue()), + {0, _PRef, _TRef, Qi0} = rabbit_queue_index:init(test_queue()), {0, 0, Qi1} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0), {Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1), @@ -1256,12 +1265,12 @@ test_queue_index() -> ok = verify_read_with_published(false, false, ReadA, lists:reverse(SeqIdsMsgIdsA)), %% call terminate twice to prove it's idempotent - _Qi5 = rabbit_queue_index:terminate(rabbit_queue_index:terminate(Qi4)), + _Qi5 = rabbit_queue_index:terminate([], rabbit_queue_index:terminate([], Qi4)), ok = stop_msg_store(), ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]), ok = start_transient_msg_store(), %% should get length back as 0, as all the msgs were transient - {0, Qi6} = rabbit_queue_index:init(test_queue()), + {0, _PRef1, _TRef1, Qi6} = rabbit_queue_index:init(test_queue()), {0, SegSize, Qi7} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6), {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7), @@ -1270,13 +1279,13 @@ test_queue_index() -> {ReadB, Qi10} = rabbit_queue_index:read_segment_entries(0, Qi9), ok = verify_read_with_published(false, true, ReadB, lists:reverse(SeqIdsMsgIdsB)), - _Qi11 = rabbit_queue_index:terminate(Qi10), + _Qi11 = rabbit_queue_index:terminate([], Qi10), ok = stop_msg_store(), ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]), ok = start_transient_msg_store(), %% should get length back as 10000 LenB = length(SeqIdsB), - {LenB, Qi12} = rabbit_queue_index:init(test_queue()), + {LenB, _PRef2, _TRef2, Qi12} = rabbit_queue_index:init(test_queue()), {0, TwoSegs, Qi13} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi12), Qi14 = queue_index_deliver(SeqIdsB, Qi13), @@ -1288,12 +1297,12 @@ test_queue_index() -> %% Everything will have gone now because #pubs == #acks {0, 0, Qi18} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17), - _Qi19 = rabbit_queue_index:terminate(Qi18), + _Qi19 = rabbit_queue_index:terminate([], Qi18), ok = stop_msg_store(), ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]), ok = start_transient_msg_store(), %% should get length back as 0 because all persistent msgs have been acked - {0, Qi20} = rabbit_queue_index:init(test_queue()), + {0, _PRef3, _TRef3, Qi20} = rabbit_queue_index:init(test_queue()), _Qi21 = rabbit_queue_index:terminate_and_erase(Qi20), ok = stop_msg_store(), ok = empty_test_queue(), @@ -1302,7 +1311,7 @@ test_queue_index() -> %% First, partials: %% a) partial pub+del+ack, then move to new segment SeqIdsC = lists:seq(0,trunc(SegmentSize/2)), - {0, Qi22} = rabbit_queue_index:init(test_queue()), + {0, _PRef4, _TRef4, Qi22} = rabbit_queue_index:init(test_queue()), {Qi23, _SeqIdsMsgIdsC} = queue_index_publish(SeqIdsC, false, Qi22), Qi24 = queue_index_deliver(SeqIdsC, Qi23), Qi25 = rabbit_queue_index:write_acks(SeqIdsC, Qi24), @@ -1313,7 +1322,7 @@ test_queue_index() -> ok = empty_test_queue(), %% b) partial pub+del, then move to new segment, then ack all in old segment - {0, Qi29} = rabbit_queue_index:init(test_queue()), + {0, _PRef5, _TRef5, Qi29} = rabbit_queue_index:init(test_queue()), {Qi30, _SeqIdsMsgIdsC2} = queue_index_publish(SeqIdsC, false, Qi29), Qi31 = queue_index_deliver(SeqIdsC, Qi30), {Qi32, _SeqIdsMsgIdsC3} = queue_index_publish([SegmentSize], false, Qi31), @@ -1325,7 +1334,7 @@ test_queue_index() -> %% c) just fill up several segments of all pubs, then +dels, then +acks SeqIdsD = lists:seq(0,SegmentSize*4), - {0, Qi36} = rabbit_queue_index:init(test_queue()), + {0, _PRef6, _TRef6, Qi36} = rabbit_queue_index:init(test_queue()), {Qi37, _SeqIdsMsgIdsD} = queue_index_publish(SeqIdsD, false, Qi36), Qi38 = queue_index_deliver(SeqIdsD, Qi37), Qi39 = rabbit_queue_index:write_acks(SeqIdsD, Qi38), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 70ebd074eb..03db8510db 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -211,7 +211,7 @@ rate_timestamp :: {integer(), integer(), integer()}, len :: non_neg_integer(), on_sync :: {[ack()], [msg_id()], [{pid(), any()}]}, - msg_store_clients :: {any(), any()}, + msg_store_clients :: {{any(), binary()}, {any(), binary()}}, persistent_store :: pid() | atom() }). @@ -256,7 +256,7 @@ %%---------------------------------------------------------------------------- init(QueueName, PersistentStore) -> - {DeltaCount, IndexState} = + {DeltaCount, PRef, TRef, IndexState} = rabbit_queue_index:init(QueueName), {DeltaSeqId, NextSeqId, IndexState1} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState), @@ -287,17 +287,20 @@ init(QueueName, PersistentStore) -> rate_timestamp = Now, len = DeltaCount, on_sync = {[], [], []}, - msg_store_clients = {rabbit_msg_store:client_init(PersistentStore), - rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE)}, + msg_store_clients = { + {rabbit_msg_store:client_init(PersistentStore, PRef), PRef}, + {rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), TRef}}, persistent_store = PersistentStore }, maybe_deltas_to_betas(State). -terminate(State = #vqstate { index_state = IndexState, - msg_store_clients = {MSCStateP, MSCStateT} }) -> +terminate(State = #vqstate { + index_state = IndexState, + msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}} }) -> rabbit_msg_store:client_terminate(MSCStateP), rabbit_msg_store:client_terminate(MSCStateT), - State #vqstate { index_state = rabbit_queue_index:terminate(IndexState) }. + Terms = [{persistent_ref, PRef}, {transient_ref, TRef}], + State #vqstate { index_state = rabbit_queue_index:terminate(Terms, IndexState) }. publish(Msg, State) -> State1 = limit_ram_index(State), @@ -466,9 +469,10 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len, %% the only difference between purge and delete is that delete also %% needs to delete everything that's been delivered and not ack'd. delete_and_terminate(State) -> - {_PurgeCount, State1 = #vqstate { index_state = IndexState, - msg_store_clients = {MSCStateP, MSCStateT}, - persistent_store = PersistentStore }} = + {_PurgeCount, State1 = #vqstate { + index_state = IndexState, + msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}}, + persistent_store = PersistentStore }} = purge(State), IndexState1 = case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id( @@ -482,6 +486,8 @@ delete_and_terminate(State) -> IndexState3 end, IndexState4 = rabbit_queue_index:terminate_and_erase(IndexState1), + rabbit_msg_store:delete_client(PersistentStore, PRef), + rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef), rabbit_msg_store:client_terminate(MSCStateP), rabbit_msg_store:client_terminate(MSCStateT), State1 #vqstate { index_state = IndexState4 }. @@ -969,14 +975,14 @@ store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true, find_msg_store(true, PersistentStore) -> PersistentStore; find_msg_store(false, _PersistentStore) -> ?TRANSIENT_MSG_STORE. -with_msg_store_state(PersistentStore, {MSCStateP, MSCStateT}, true, +with_msg_store_state(PersistentStore, {{MSCStateP, PRef}, MSCStateT}, true, Fun) -> {Result, MSCStateP1} = Fun(PersistentStore, MSCStateP), - {Result, {MSCStateP1, MSCStateT}}; -with_msg_store_state(_PersistentStore, {MSCStateP, MSCStateT}, false, + {Result, {{MSCStateP1, PRef}, MSCStateT}}; +with_msg_store_state(_PersistentStore, {MSCStateP, {MSCStateT, TRef}}, false, Fun) -> {Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT), - {Result, {MSCStateP, MSCStateT1}}. + {Result, {MSCStateP, {MSCStateT1, TRef}}}. read_from_msg_store(PersistentStore, MSCState, IsPersistent, MsgId) -> with_msg_store_state( |
