summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-05 16:36:22 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-05 16:36:22 +0100
commit85812ceafa7ecf102c5a05b976a04140624caa19 (patch)
treef721af45d65724d3bd5c3266de024be913479176
parent7c10eef2999807f2a20837e0388a44d15dfcdce6 (diff)
downloadrabbitmq-server-git-85812ceafa7ecf102c5a05b976a04140624caa19.tar.gz
The msg_store now avoids building the index and scanning files iff it is shutdown cleanly, and all the clients that it previously knew about were also shutdown cleanly and found on startup.
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_msg_store.erl149
-rw-r--r--src/rabbit_msg_store_ets_index.erl8
-rw-r--r--src/rabbit_queue_index.erl78
-rw-r--r--src/rabbit_tests.erl55
-rw-r--r--src/rabbit_variable_queue.erl34
6 files changed, 237 insertions, 94 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c14a28fe11..d23cbd1923 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -129,9 +129,10 @@
start() ->
ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()),
- ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
- [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
- fun (ok) -> finished end, ok]),
+ ok = rabbit_sup:start_child(
+ ?TRANSIENT_MSG_STORE, rabbit_msg_store,
+ [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined,
+ fun (ok) -> finished end, ok]),
DurableQueues = find_durable_queues(),
ok = rabbit_queue_index:start_persistent_msg_store(DurableQueues),
{ok,_} = supervisor:start_child(
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 3b3df7208e..418b5d5864 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -33,8 +33,9 @@
-behaviour(gen_server2).
--export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
- sync/3, client_init/1, client_terminate/1, clean/2]).
+-export([start_link/5, write/4, read/3, contains/2, remove/2, release/2,
+ sync/3, client_init/2, client_terminate/1, delete_client/2, clean/2,
+ successfully_recovered_state/1]).
-export([sync/1, gc_done/4, set_maximum_since_use/2,
build_index_worker/6]). %% internal
@@ -42,9 +43,10 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, handle_pre_hibernate/1]).
--define(SYNC_INTERVAL, 5). %% milliseconds
-
--define(GEOMETRIC_P, 0.3). %% parameter to geometric distribution rng
+-define(SYNC_INTERVAL, 5). %% milliseconds
+-define(GEOMETRIC_P, 0.3). %% parameter to geometric distribution rng
+-define(CLEAN_FILENAME, "clean.dot").
+-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
%%----------------------------------------------------------------------------
@@ -66,7 +68,9 @@
file_handles_ets, %% tid of the shared file handles table
file_summary_ets, %% tid of the file summary table
dedup_cache_ets, %% tid of dedup cache table
- cur_file_cache_ets %% tid of current file cache table
+ cur_file_cache_ets, %% tid of current file cache table
+ client_refs, %% set of references of all registered clients
+ recovered_state %% boolean: did we recover state?
}).
-record(client_msstate,
@@ -98,8 +102,8 @@
dedup_cache_ets :: tid(),
cur_file_cache_ets :: tid() }).
--spec(start_link/4 ::
- (atom(), file_path(),
+-spec(start_link/5 ::
+ (atom(), file_path(), [binary()] | 'undefined',
(fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})), A) ->
{'ok', pid()} | 'ignore' | {'error', any()}).
-spec(write/4 :: (server(), msg_id(), msg(), client_msstate()) ->
@@ -112,9 +116,11 @@
-spec(sync/3 :: (server(), [msg_id()], fun (() -> any())) -> 'ok').
-spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> 'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
--spec(client_init/1 :: (server()) -> client_msstate()).
+-spec(client_init/2 :: (server(), binary()) -> client_msstate()).
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
+-spec(delete_client/2 :: (server(), binary()) -> 'ok').
-spec(clean/2 :: (atom(), file_path()) -> 'ok').
+-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
-endif.
@@ -278,9 +284,9 @@
%% public API
%%----------------------------------------------------------------------------
-start_link(Server, Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) ->
+start_link(Server, Dir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit) ->
gen_server2:start_link({local, Server}, ?MODULE,
- [Server, Dir, MsgRefDeltaGen, MsgRefDeltaGenInit],
+ [Server, Dir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit],
[{timeout, infinity}]).
write(Server, MsgId, Msg, CState =
@@ -326,9 +332,10 @@ gc_done(Server, Reclaimed, Source, Destination) ->
set_maximum_since_use(Server, Age) ->
gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}).
-client_init(Server) ->
+client_init(Server, Ref) ->
{IState, IModule, Dir, FileHandlesEts, FileSummaryEts, DedupCacheEts,
- CurFileCacheEts} = gen_server2:call(Server, new_client_state, infinity),
+ CurFileCacheEts} = gen_server2:call(Server, {new_client_state, Ref},
+ infinity),
#client_msstate { file_handle_cache = dict:new(),
index_state = IState,
index_module = IModule,
@@ -342,6 +349,12 @@ client_terminate(CState) ->
close_all_handles(CState),
ok.
+delete_client(Server, Ref) ->
+ ok = gen_server2:call(Server, {delete_client, Ref}, infinity).
+
+successfully_recovered_state(Server) ->
+ gen_server2:call(Server, successfully_recovered_state, infinity).
+
clean(Server, BaseDir) ->
Dir = filename:join(BaseDir, atom_to_list(Server)),
ok = rabbit_misc:recursive_delete(Dir).
@@ -467,7 +480,7 @@ close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } =
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([Server, BaseDir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
+init([Server, BaseDir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
@@ -477,12 +490,32 @@ init([Server, BaseDir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
{ok, IndexModule} = application:get_env(msg_store_index_module),
rabbit_log:info("Using ~p to provide index for message store~n",
[IndexModule]),
- {fresh, IndexState} = IndexModule:init(fresh, Dir),
+
+ {Recovered, IndexState, ClientRefs1} =
+ case detect_clean_shutdown(Dir) of
+ {false, _Error} ->
+ {fresh, IndexState1} = IndexModule:init(fresh, Dir),
+ {false, IndexState1, sets:new()};
+ {true, Terms} ->
+ case undefined /= ClientRefs andalso lists:sort(ClientRefs) ==
+ lists:sort(proplists:get_value(client_refs, Terms, []))
+ andalso proplists:get_value(index_module, Terms) ==
+ IndexModule of
+ true ->
+ case IndexModule:init(recover, Dir) of
+ {fresh, IndexState1} ->
+ {false, IndexState1, sets:new()};
+ {recovered, IndexState1} ->
+ {true, IndexState1, sets:from_list(ClientRefs)}
+ end;
+ false ->
+ {fresh, IndexState1} = IndexModule:init(fresh, Dir),
+ {false, IndexState1, sets:new()}
+ end
+ end,
InitFile = 0,
- FileSummaryEts = ets:new(rabbit_msg_store_file_summary,
- [ordered_set, public,
- {keypos, #file_summary.file}]),
+ {Recovered1, FileSummaryEts} = recover_file_summary(Recovered, Dir),
DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]),
FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
[ordered_set, public]),
@@ -504,20 +537,23 @@ init([Server, BaseDir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts
+ cur_file_cache_ets = CurFileCacheEts,
+ client_refs = ClientRefs1,
+ recovered_state = Recovered
},
- ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
+ ok = count_msg_refs(Recovered, MsgRefDeltaGen, MsgRefDeltaGenInit, State),
FileNames =
sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)),
TmpFileNames =
sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)),
ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames),
+
%% There should be no more tmp files now, so go ahead and load the
%% whole lot
Files = [filename_to_num(FileName) || FileName <- FileNames],
{Offset, State1 = #msstate { current_file = CurFile }} =
- build_index(Files, State),
+ build_index(Recovered1, Files, State),
%% read is only needed so that we can seek
{ok, FileHdl} = rabbit_msg_store_misc:open_file(
@@ -543,15 +579,25 @@ handle_call({contains, MsgId}, From, State) ->
State1 = contains_message(MsgId, From, State),
noreply(State1);
-handle_call(new_client_state, _From,
+handle_call({new_client_state, CRef}, _From,
State = #msstate { index_state = IndexState, dir = Dir,
index_module = IndexModule,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
+ cur_file_cache_ets = CurFileCacheEts,
+ client_refs = ClientRefs }) ->
reply({IndexState, IndexModule, Dir, FileHandlesEts, FileSummaryEts,
- DedupCacheEts, CurFileCacheEts}, State).
+ DedupCacheEts, CurFileCacheEts},
+ State #msstate { client_refs = sets:add_element(CRef, ClientRefs) });
+
+handle_call(successfully_recovered_state, _From, State) ->
+ reply(State #msstate.recovered_state, State);
+
+handle_call({delete_client, CRef}, _From,
+ State = #msstate { client_refs = ClientRefs }) ->
+ reply(ok,
+ State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
handle_cast({write, MsgId, Msg},
State = #msstate { current_file_handle = CurHdl,
@@ -680,7 +726,9 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
+ cur_file_cache_ets = CurFileCacheEts,
+ client_refs = ClientRefs,
+ dir = Dir }) ->
%% stop the gc first, otherwise it could be working and we pull
%% out the ets tables from under it.
ok = rabbit_msg_store_gc:stop(GCPid),
@@ -691,11 +739,13 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
State2
end,
State3 = close_all_handles(State1),
- ets:delete(FileSummaryEts),
+ store_file_summary(FileSummaryEts, Dir),
ets:delete(DedupCacheEts),
ets:delete(FileHandlesEts),
ets:delete(CurFileCacheEts),
IndexModule:terminate(IndexState),
+ store_clean_shutdown([{client_refs, sets:to_list(ClientRefs)},
+ {index_module, IndexModule}], Dir),
State3 #msstate { index_state = undefined,
current_file_handle = undefined }.
@@ -957,6 +1007,35 @@ get_read_handle(FileNum, FHC, Dir) ->
{Hdl, dict:store(FileNum, Hdl, FHC) }
end.
+detect_clean_shutdown(Dir) ->
+ Path = filename:join(Dir, ?CLEAN_FILENAME),
+ case rabbit_misc:read_term_file(Path) of
+ {ok, Terms} -> case file:delete(Path) of
+ ok -> {true, Terms};
+ {error, Error} -> {false, Error}
+ end;
+ {error, Error} -> {false, Error}
+ end.
+
+store_clean_shutdown(Terms, Dir) ->
+ rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
+
+recover_file_summary(false, _Dir) ->
+ {false, ets:new(rabbit_msg_store_file_summary,
+ [ordered_set, public, {keypos, #file_summary.file}])};
+recover_file_summary(true, Dir) ->
+ Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME),
+ case ets:file2tab(Path) of
+ {ok, Tid} -> file:delete(Path),
+ {true, Tid};
+ {error, _} -> recover_file_summary(false, Dir)
+ end.
+
+store_file_summary(Tid, Dir) ->
+ ok = ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME),
+ [{extended_info, [object_count]}]),
+ ets:delete(Tid).
+
%%----------------------------------------------------------------------------
%% message cache helper functions
%%----------------------------------------------------------------------------
@@ -1034,6 +1113,11 @@ index_delete_by_file(File, #msstate { index_module = Index,
%% recovery
%%----------------------------------------------------------------------------
+count_msg_refs(false, Gen, Seed, State) ->
+ count_msg_refs(Gen, Seed, State);
+count_msg_refs(true, _Gen, _Seed, _State) ->
+ ok.
+
count_msg_refs(Gen, Seed, State) ->
case Gen(Seed) of
finished -> ok;
@@ -1183,7 +1267,18 @@ find_contiguous_block_prefix([{MsgId, TotalSize, ExpectedOffset} | Tail],
find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) ->
{ExpectedOffset, MsgIds}.
-build_index(Files, State) ->
+build_index(true, _Files, State =
+ #msstate { file_summary_ets = FileSummaryEts }) ->
+ ets:foldl(
+ fun (#file_summary { valid_total_size = ValidTotalSize,
+ file_size = FileSize, file = File },
+ {_Offset, State1 = #msstate { sum_valid_data = SumValid,
+ sum_file_size = SumFileSize }}) ->
+ {FileSize, State1 #msstate { sum_valid_data = SumValid + ValidTotalSize,
+ sum_file_size = SumFileSize + FileSize,
+ current_file = File }}
+ end, {0, State}, FileSummaryEts);
+build_index(false, Files, State) ->
{ok, Pid} = gatherer:start_link(),
case Files of
[] -> build_index(Pid, undefined, [State #msstate.current_file], State);
diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl
index f30934c516..d46212ba15 100644
--- a/src/rabbit_msg_store_ets_index.erl
+++ b/src/rabbit_msg_store_ets_index.erl
@@ -37,7 +37,7 @@
delete_by_file/2, terminate/1]).
-define(MSG_LOC_NAME, rabbit_msg_store_ets_index).
--define(FILENAME, msg_store_index.ets).
+-define(FILENAME, "msg_store_index.ets").
-include("rabbit_msg_store_index.hrl").
@@ -48,8 +48,10 @@ init(fresh, Dir) ->
Tid = ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.msg_id}]),
{fresh, #state { table = Tid, dir = Dir }};
init(recover, Dir) ->
- case ets:file2tab(filename:join(Dir, ?FILENAME)) of
- {ok, Tid} -> {recovered, #state { table = Tid, dir = Dir }};
+ Path = filename:join(Dir, ?FILENAME),
+ case ets:file2tab(Path) of
+ {ok, Tid} -> file:delete(Path),
+ {recovered, #state { table = Tid, dir = Dir }};
{error, _} -> init(fresh, Dir)
end.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 2a94adf77f..f37d701931 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,7 +31,7 @@
-module(rabbit_queue_index).
--export([init/1, terminate/1, terminate_and_erase/1, write_published/4,
+-export([init/1, terminate/2, terminate_and_erase/1, write_published/4,
write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1,
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
find_lowest_seq_id_seg_and_next_seq_id/1,
@@ -195,8 +195,9 @@
dirty_count :: integer()
}).
--spec(init/1 :: (queue_name()) -> {non_neg_integer(), qistate()}).
--spec(terminate/1 :: (qistate()) -> qistate()).
+-spec(init/1 :: (queue_name()) ->
+ {non_neg_integer(), binary(), binary(), qistate()}).
+-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(terminate_and_erase/1 :: (qistate()) -> qistate()).
-spec(write_published/4 :: (msg_id(), seq_id(), boolean(), qistate())
-> qistate()).
@@ -221,6 +222,19 @@
init(Name) ->
State = blank_state(Name),
+ {PRef, TRef} = case read_shutdown_terms(State #qistate.dir) of
+ {error, _} ->
+ {rabbit_guid:guid(), rabbit_guid:guid()};
+ {ok, Terms} ->
+ case [persistent_ref, transient_ref] --
+ proplists:get_keys(Terms) of
+ [] ->
+ {proplists:get_value(persistent_ref, Terms),
+ proplists:get_value(transient_ref, Terms)};
+ _ ->
+ {rabbit_guid:guid(), rabbit_guid:guid()}
+ end
+ end,
%% 1. Load the journal completely. This will also load segments
%% which have entries in the journal and remove duplicates.
%% The counts will correctly reflect the combination of the
@@ -263,7 +277,8 @@ init(Name) ->
{segment_store(Segment2, Segments2),
CountAcc + PubCount1 - AckCount1, DCountAcc1}
end, {Segments, 0, DCount}, AllSegs),
- {Count, State2 #qistate { segments = Segments1, dirty_count = DCount1 }}.
+ {Count, PRef, TRef,
+ State2 #qistate { segments = Segments1, dirty_count = DCount1 }}.
maybe_add_to_journal( true, true, _Del, _RelSeq, Segment) ->
{Segment, 0};
@@ -276,11 +291,11 @@ maybe_add_to_journal(false, _, del, RelSeq, Segment) ->
maybe_add_to_journal(false, _, _Del, RelSeq, Segment) ->
{add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)), 2}.
-terminate(State) ->
- terminate(true, State).
+terminate(Terms, State) ->
+ terminate(true, Terms, State).
terminate_and_erase(State) ->
- State1 = terminate(State),
+ State1 = terminate(false, [], State),
ok = delete_queue_directory(State1 #qistate.dir),
State1.
@@ -397,20 +412,33 @@ start_persistent_msg_store(DurableQueues) ->
[]
end,
DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)),
- {DurableQueueNames, TransientDirs} =
+ {DurableQueueNames, TransientDirs, DurableRefs} =
lists:foldl(
- fun (QueueDir, {DurableAcc, TransientAcc}) ->
+ fun (QueueDir, {DurableAcc, TransientAcc, RefsAcc}) ->
case sets:is_element(QueueDir, DurableDirectories) of
true ->
+ RefsAcc1 =
+ case read_shutdown_terms(
+ filename:join(QueuesDir, QueueDir)) of
+ {error, _} ->
+ RefsAcc;
+ {ok, Terms} ->
+ case proplists:get_value(
+ persistent_ref, Terms) of
+ undefined -> RefsAcc;
+ Ref -> [Ref | RefsAcc]
+ end
+ end,
{[dict:fetch(QueueDir, DurableDict) | DurableAcc],
- TransientAcc};
+ TransientAcc, RefsAcc1};
false ->
- {DurableAcc, [QueueDir | TransientAcc]}
+ {DurableAcc, [QueueDir | TransientAcc], RefsAcc}
end
- end, {[], []}, Directories),
- ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store,
- [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
- fun queue_index_walker/1, DurableQueueNames]),
+ end, {[], [], []}, Directories),
+ ok = rabbit_sup:start_child(
+ ?PERSISTENT_MSG_STORE, rabbit_msg_store,
+ [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), DurableRefs,
+ fun queue_index_walker/1, DurableQueueNames]),
lists:foreach(fun (DirName) ->
Dir = filename:join(queues_dir(), DirName),
ok = delete_queue_directory(Dir)
@@ -444,7 +472,7 @@ queue_index_walker_reader(QueueName, Gatherer, Guid) ->
queue_index_walker_reader(Gatherer, Guid, State1, SegNums).
queue_index_walker_reader(Gatherer, Guid, State, []) ->
- _State = terminate(false, State),
+ _State = terminate(false, [], State),
ok = gatherer:finished(Gatherer, Guid);
queue_index_walker_reader(Gatherer, Guid, State, [Seg | SegNums]) ->
SeqId = reconstruct_seq_id(Seg, 0),
@@ -518,11 +546,11 @@ detect_clean_shutdown(Dir) ->
{error, enoent} -> false
end.
-store_clean_shutdown(Dir) ->
- {ok, Hdl} = file_handle_cache:open(filename:join(Dir, ?CLEAN_FILENAME),
- [write, raw, binary],
- [{write_buffer, unbuffered}]),
- ok = file_handle_cache:close(Hdl).
+read_shutdown_terms(Dir) ->
+ rabbit_misc:read_term_file(filename:join(Dir, ?CLEAN_FILENAME)).
+
+store_clean_shutdown(Terms, Dir) ->
+ rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
queue_name_to_dir_name(Name = #resource { kind = queue }) ->
Bin = term_to_binary(Name),
@@ -646,7 +674,9 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
end,
Hdl.
-terminate(StoreShutdown, State =
+terminate(_StoreShutdown, _Terms, State = #qistate { segments = undefined }) ->
+ State;
+terminate(StoreShutdown, Terms, State =
#qistate { journal_handle = JournalHdl,
dir = Dir, segments = Segments }) ->
ok = case JournalHdl of
@@ -660,10 +690,10 @@ terminate(StoreShutdown, State =
file_handle_cache:close(Hdl)
end, ok, Segments),
case StoreShutdown of
- true -> store_clean_shutdown(Dir);
+ true -> store_clean_shutdown(Terms, Dir);
false -> ok
end,
- State #qistate { journal_handle = undefined, segments = segments_new() }.
+ State #qistate { journal_handle = undefined, segments = undefined }.
%%----------------------------------------------------------------------------
%% Majors
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 75c66693e3..22473594a0 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -995,16 +995,18 @@ start_msg_store_empty() ->
start_msg_store(fun (ok) -> finished end, ok).
start_msg_store(MsgRefDeltaGen, MsgRefDeltaGenInit) ->
- ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store,
- [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
- MsgRefDeltaGen, MsgRefDeltaGenInit]),
+ ok = rabbit_sup:start_child(
+ ?PERSISTENT_MSG_STORE, rabbit_msg_store,
+ [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), undefined,
+ MsgRefDeltaGen, MsgRefDeltaGenInit]),
start_transient_msg_store().
start_transient_msg_store() ->
ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()),
- ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
- [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
- fun (ok) -> finished end, ok]).
+ ok = rabbit_sup:start_child(
+ ?TRANSIENT_MSG_STORE, rabbit_msg_store,
+ [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined,
+ fun (ok) -> finished end, ok]).
stop_msg_store() ->
case supervisor:terminate_child(rabbit_sup, ?PERSISTENT_MSG_STORE) of
@@ -1061,7 +1063,8 @@ test_msg_store() ->
{MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds),
%% check we don't contain any of the msgs we're about to publish
false = msg_store_contains(false, MsgIds),
- MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE),
+ Ref = rabbit_guid:guid(),
+ MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
%% publish the first half
{ok, MSCState1} = msg_store_write(MsgIds1stHalf, MSCState),
%% sync on the first half
@@ -1135,7 +1138,7 @@ test_msg_store() ->
%% check we don't contain any of the msgs
false = msg_store_contains(false, MsgIds),
%% publish the first half again
- MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE),
+ MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
{ok, MSCState9} = msg_store_write(MsgIds1stHalf, MSCState8),
%% this should force some sort of sync internally otherwise misread
ok = rabbit_msg_store:client_terminate(
@@ -1154,7 +1157,7 @@ test_msg_store() ->
{ok, MSCStateM} =
rabbit_msg_store:write(?PERSISTENT_MSG_STORE, MsgId, Payload, MSCStateN),
MSCStateM
- end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE), MsgIdsBig)),
+ end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), MsgIdsBig)),
%% now read them to ensure we hit the fast client-side reading
ok = rabbit_msg_store:client_terminate(
lists:foldl(
@@ -1162,7 +1165,7 @@ test_msg_store() ->
{{ok, Payload}, MSCStateN} =
rabbit_msg_store:read(?PERSISTENT_MSG_STORE, MsgId, MSCStateM),
MSCStateN
- end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE), MsgIdsBig)),
+ end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), MsgIdsBig)),
%% .., then 3s by 1...
ok = lists:foldl(
fun (MsgId, ok) ->
@@ -1203,21 +1206,27 @@ test_amqqueue(Durable) ->
empty_test_queue() ->
ok = start_transient_msg_store(),
ok = rabbit_queue_index:start_persistent_msg_store([]),
- {0, Qi1} = rabbit_queue_index:init(test_queue()),
+ {0, _PRef, _TRef, Qi1} = rabbit_queue_index:init(test_queue()),
_Qi2 = rabbit_queue_index:terminate_and_erase(Qi1),
ok.
queue_index_publish(SeqIds, Persistent, Qi) ->
+ Ref = rabbit_guid:guid(),
+ MsgStore = case Persistent of
+ true -> ?PERSISTENT_MSG_STORE;
+ false -> ?TRANSIENT_MSG_STORE
+ end,
{A, B, MSCStateEnd} =
lists:foldl(
fun (SeqId, {QiN, SeqIdsMsgIdsAcc, MSCStateN}) ->
MsgId = rabbit_guid:guid(),
QiM = rabbit_queue_index:write_published(MsgId, SeqId, Persistent,
QiN),
- {ok, MSCStateM} = rabbit_msg_store:write(?PERSISTENT_MSG_STORE, MsgId,
+ {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, MsgId,
MsgId, MSCStateN),
{QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc], MSCStateM}
- end, {Qi, [], rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE)}, SeqIds),
+ end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds),
+ ok = rabbit_msg_store:delete_client(MsgStore, Ref),
ok = rabbit_msg_store:client_terminate(MSCStateEnd),
{A, B}.
@@ -1246,7 +1255,7 @@ test_queue_index() ->
ok = empty_test_queue(),
SeqIdsA = lists:seq(0,9999),
SeqIdsB = lists:seq(10000,19999),
- {0, Qi0} = rabbit_queue_index:init(test_queue()),
+ {0, _PRef, _TRef, Qi0} = rabbit_queue_index:init(test_queue()),
{0, 0, Qi1} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0),
{Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1),
@@ -1256,12 +1265,12 @@ test_queue_index() ->
ok = verify_read_with_published(false, false, ReadA,
lists:reverse(SeqIdsMsgIdsA)),
%% call terminate twice to prove it's idempotent
- _Qi5 = rabbit_queue_index:terminate(rabbit_queue_index:terminate(Qi4)),
+ _Qi5 = rabbit_queue_index:terminate([], rabbit_queue_index:terminate([], Qi4)),
ok = stop_msg_store(),
ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]),
ok = start_transient_msg_store(),
%% should get length back as 0, as all the msgs were transient
- {0, Qi6} = rabbit_queue_index:init(test_queue()),
+ {0, _PRef1, _TRef1, Qi6} = rabbit_queue_index:init(test_queue()),
{0, SegSize, Qi7} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6),
{Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7),
@@ -1270,13 +1279,13 @@ test_queue_index() ->
{ReadB, Qi10} = rabbit_queue_index:read_segment_entries(0, Qi9),
ok = verify_read_with_published(false, true, ReadB,
lists:reverse(SeqIdsMsgIdsB)),
- _Qi11 = rabbit_queue_index:terminate(Qi10),
+ _Qi11 = rabbit_queue_index:terminate([], Qi10),
ok = stop_msg_store(),
ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]),
ok = start_transient_msg_store(),
%% should get length back as 10000
LenB = length(SeqIdsB),
- {LenB, Qi12} = rabbit_queue_index:init(test_queue()),
+ {LenB, _PRef2, _TRef2, Qi12} = rabbit_queue_index:init(test_queue()),
{0, TwoSegs, Qi13} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi12),
Qi14 = queue_index_deliver(SeqIdsB, Qi13),
@@ -1288,12 +1297,12 @@ test_queue_index() ->
%% Everything will have gone now because #pubs == #acks
{0, 0, Qi18} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17),
- _Qi19 = rabbit_queue_index:terminate(Qi18),
+ _Qi19 = rabbit_queue_index:terminate([], Qi18),
ok = stop_msg_store(),
ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]),
ok = start_transient_msg_store(),
%% should get length back as 0 because all persistent msgs have been acked
- {0, Qi20} = rabbit_queue_index:init(test_queue()),
+ {0, _PRef3, _TRef3, Qi20} = rabbit_queue_index:init(test_queue()),
_Qi21 = rabbit_queue_index:terminate_and_erase(Qi20),
ok = stop_msg_store(),
ok = empty_test_queue(),
@@ -1302,7 +1311,7 @@ test_queue_index() ->
%% First, partials:
%% a) partial pub+del+ack, then move to new segment
SeqIdsC = lists:seq(0,trunc(SegmentSize/2)),
- {0, Qi22} = rabbit_queue_index:init(test_queue()),
+ {0, _PRef4, _TRef4, Qi22} = rabbit_queue_index:init(test_queue()),
{Qi23, _SeqIdsMsgIdsC} = queue_index_publish(SeqIdsC, false, Qi22),
Qi24 = queue_index_deliver(SeqIdsC, Qi23),
Qi25 = rabbit_queue_index:write_acks(SeqIdsC, Qi24),
@@ -1313,7 +1322,7 @@ test_queue_index() ->
ok = empty_test_queue(),
%% b) partial pub+del, then move to new segment, then ack all in old segment
- {0, Qi29} = rabbit_queue_index:init(test_queue()),
+ {0, _PRef5, _TRef5, Qi29} = rabbit_queue_index:init(test_queue()),
{Qi30, _SeqIdsMsgIdsC2} = queue_index_publish(SeqIdsC, false, Qi29),
Qi31 = queue_index_deliver(SeqIdsC, Qi30),
{Qi32, _SeqIdsMsgIdsC3} = queue_index_publish([SegmentSize], false, Qi31),
@@ -1325,7 +1334,7 @@ test_queue_index() ->
%% c) just fill up several segments of all pubs, then +dels, then +acks
SeqIdsD = lists:seq(0,SegmentSize*4),
- {0, Qi36} = rabbit_queue_index:init(test_queue()),
+ {0, _PRef6, _TRef6, Qi36} = rabbit_queue_index:init(test_queue()),
{Qi37, _SeqIdsMsgIdsD} = queue_index_publish(SeqIdsD, false, Qi36),
Qi38 = queue_index_deliver(SeqIdsD, Qi37),
Qi39 = rabbit_queue_index:write_acks(SeqIdsD, Qi38),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 70ebd074eb..03db8510db 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -211,7 +211,7 @@
rate_timestamp :: {integer(), integer(), integer()},
len :: non_neg_integer(),
on_sync :: {[ack()], [msg_id()], [{pid(), any()}]},
- msg_store_clients :: {any(), any()},
+ msg_store_clients :: {{any(), binary()}, {any(), binary()}},
persistent_store :: pid() | atom()
}).
@@ -256,7 +256,7 @@
%%----------------------------------------------------------------------------
init(QueueName, PersistentStore) ->
- {DeltaCount, IndexState} =
+ {DeltaCount, PRef, TRef, IndexState} =
rabbit_queue_index:init(QueueName),
{DeltaSeqId, NextSeqId, IndexState1} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState),
@@ -287,17 +287,20 @@ init(QueueName, PersistentStore) ->
rate_timestamp = Now,
len = DeltaCount,
on_sync = {[], [], []},
- msg_store_clients = {rabbit_msg_store:client_init(PersistentStore),
- rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE)},
+ msg_store_clients = {
+ {rabbit_msg_store:client_init(PersistentStore, PRef), PRef},
+ {rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), TRef}},
persistent_store = PersistentStore
},
maybe_deltas_to_betas(State).
-terminate(State = #vqstate { index_state = IndexState,
- msg_store_clients = {MSCStateP, MSCStateT} }) ->
+terminate(State = #vqstate {
+ index_state = IndexState,
+ msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}} }) ->
rabbit_msg_store:client_terminate(MSCStateP),
rabbit_msg_store:client_terminate(MSCStateT),
- State #vqstate { index_state = rabbit_queue_index:terminate(IndexState) }.
+ Terms = [{persistent_ref, PRef}, {transient_ref, TRef}],
+ State #vqstate { index_state = rabbit_queue_index:terminate(Terms, IndexState) }.
publish(Msg, State) ->
State1 = limit_ram_index(State),
@@ -466,9 +469,10 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len,
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
delete_and_terminate(State) ->
- {_PurgeCount, State1 = #vqstate { index_state = IndexState,
- msg_store_clients = {MSCStateP, MSCStateT},
- persistent_store = PersistentStore }} =
+ {_PurgeCount, State1 = #vqstate {
+ index_state = IndexState,
+ msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}},
+ persistent_store = PersistentStore }} =
purge(State),
IndexState1 =
case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(
@@ -482,6 +486,8 @@ delete_and_terminate(State) ->
IndexState3
end,
IndexState4 = rabbit_queue_index:terminate_and_erase(IndexState1),
+ rabbit_msg_store:delete_client(PersistentStore, PRef),
+ rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef),
rabbit_msg_store:client_terminate(MSCStateP),
rabbit_msg_store:client_terminate(MSCStateT),
State1 #vqstate { index_state = IndexState4 }.
@@ -969,14 +975,14 @@ store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true,
find_msg_store(true, PersistentStore) -> PersistentStore;
find_msg_store(false, _PersistentStore) -> ?TRANSIENT_MSG_STORE.
-with_msg_store_state(PersistentStore, {MSCStateP, MSCStateT}, true,
+with_msg_store_state(PersistentStore, {{MSCStateP, PRef}, MSCStateT}, true,
Fun) ->
{Result, MSCStateP1} = Fun(PersistentStore, MSCStateP),
- {Result, {MSCStateP1, MSCStateT}};
-with_msg_store_state(_PersistentStore, {MSCStateP, MSCStateT}, false,
+ {Result, {{MSCStateP1, PRef}, MSCStateT}};
+with_msg_store_state(_PersistentStore, {MSCStateP, {MSCStateT, TRef}}, false,
Fun) ->
{Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT),
- {Result, {MSCStateP, MSCStateT1}}.
+ {Result, {MSCStateP, {MSCStateT1, TRef}}}.
read_from_msg_store(PersistentStore, MSCState, IsPersistent, MsgId) ->
with_msg_store_state(