summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-01 19:08:04 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-01 19:08:04 +0100
commit72519fca997e48195c1ef56e26597fa3395c99d1 (patch)
treec1eb0a97ece1edc59ce5d5388d66530fc3c93cba
parentae47f65d16cb229e4298aea435e543f1ccd55f16 (diff)
downloadrabbitmq-server-git-72519fca997e48195c1ef56e26597fa3395c99d1.tar.gz
Split msg_store into two msg stores, one for persistent and one for transient. This is the first step in trying to make startup and recovery of data on disk much faster.
-rw-r--r--include/rabbit.hrl3
-rw-r--r--include/rabbit_msg_store.hrl5
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_msg_store.erl489
-rw-r--r--src/rabbit_msg_store_gc.erl45
-rw-r--r--src/rabbit_queue_index.erl17
-rw-r--r--src/rabbit_sup.erl7
-rw-r--r--src/rabbit_tests.erl131
-rw-r--r--src/rabbit_variable_queue.erl205
9 files changed, 532 insertions, 378 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index df282029e5..e9fa6e376c 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -177,6 +177,9 @@
-define(MAX_WAIT, 16#ffffffff).
+-define(PERSISTENT_MSG_STORE, msg_store_persistent).
+-define(TRANSIENT_MSG_STORE, msg_store_transient).
+
-ifdef(debug).
-define(LOGDEBUG0(F), rabbit_log:debug(F)).
-define(LOGDEBUG(F,A), rabbit_log:debug(F,A)).
diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl
index 6f557c1835..2c2735d483 100644
--- a/include/rabbit_msg_store.hrl
+++ b/include/rabbit_msg_store.hrl
@@ -49,8 +49,3 @@
-define(FILE_SIZE_LIMIT, (16*1024*1024)).
-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
-
--define(FILE_SUMMARY_ETS_NAME, rabbit_msg_store_file_summary).
--define(CACHE_ETS_NAME, rabbit_msg_store_cache).
--define(FILE_HANDLES_ETS_NAME, rabbit_msg_store_file_handles).
--define(CUR_FILE_CACHE_ETS_NAME, rabbit_msg_store_cur_file).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9899354022..f0540c93a2 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -65,8 +65,7 @@
-type(qfun(A) :: fun ((amqqueue()) -> A)).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
--type(seq_id() :: non_neg_integer()).
--type(acktag() :: ('ack_not_on_disk' | {'ack_index_and_store', msg_id(), seq_id()})).
+-type(acktag() :: any()).
-spec(start/0 :: () -> 'ok').
-spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) ->
@@ -129,8 +128,11 @@
%%----------------------------------------------------------------------------
start() ->
+ ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
+ [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
+ fun (ok) -> finished end, ok]),
DurableQueues = find_durable_queues(),
- ok = rabbit_queue_index:start_msg_store(DurableQueues),
+ ok = rabbit_queue_index:start_persistent_msg_store(DurableQueues),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup,
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 86f1d9c936..a33b1a3470 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -33,16 +33,14 @@
-behaviour(gen_server2).
--export([start_link/3, write/2, read/2, contains/1, remove/1, release/1,
- sync/2, client_init/0, client_terminate/1]).
+-export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
+ sync/3, client_init/1, client_terminate/1]).
--export([sync/0, gc_done/3, set_maximum_since_use/1]). %% internal
+-export([sync/1, gc_done/4, set_maximum_since_use/2]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, handle_pre_hibernate/1]).
--define(SERVER, ?MODULE).
-
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(GEOMETRIC_P, 0.3). %% parameter to geometric distribution rng
@@ -62,44 +60,58 @@
sum_valid_data, %% sum of valid data in all files
sum_file_size, %% sum of file sizes
pending_gc_completion, %% things to do once GC completes
- gc_active %% is the GC currently working?
+ gc_active, %% is the GC currently working?
+ gc_pid, %% pid of our GC
+ file_handles_ets, %% tid of the shared file handles table
+ file_summary_ets, %% tid of the file summary table
+ dedup_cache_ets, %% tid of dedup cache table
+ cur_file_cache_ets %% tid of current file cache table
}).
-record(client_msstate,
{ file_handle_cache,
index_state,
index_module,
- dir
+ dir,
+ file_handles_ets,
+ file_summary_ets,
+ dedup_cache_ets,
+ cur_file_cache_ets
}).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+-type(server() :: pid() | atom()).
-type(msg_id() :: binary()).
-type(msg() :: any()).
-type(file_path() :: any()).
-type(file_num() :: non_neg_integer()).
--type(client_msstate() :: #client_msstate { file_handle_cache :: dict(),
- index_state :: any(),
- index_module :: atom(),
- dir :: file_path() }).
-
--spec(start_link/3 ::
- (file_path(),
+-type(client_msstate() :: #client_msstate { file_handle_cache :: dict(),
+ index_state :: any(),
+ index_module :: atom(),
+ dir :: file_path(),
+ file_handles_ets :: tid(),
+ file_summary_ets :: tid(),
+ dedup_cache_ets :: tid(),
+ cur_file_cache_ets :: tid() }).
+
+-spec(start_link/4 ::
+ (atom(), file_path(),
(fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})), A) ->
{'ok', pid()} | 'ignore' | {'error', any()}).
--spec(write/2 :: (msg_id(), msg()) -> 'ok').
-%% -spec(read/1 :: (msg_id()) -> {'ok', msg()} | 'not_found').
--spec(read/2 :: (msg_id(), client_msstate()) ->
+-spec(write/4 :: (server(), msg_id(), msg(), client_msstate()) ->
+ {'ok', client_msstate()}).
+-spec(read/3 :: (server(), msg_id(), client_msstate()) ->
{{'ok', msg()} | 'not_found', client_msstate()}).
--spec(contains/1 :: (msg_id()) -> boolean()).
--spec(remove/1 :: ([msg_id()]) -> 'ok').
--spec(release/1 :: ([msg_id()]) -> 'ok').
--spec(sync/2 :: ([msg_id()], fun (() -> any())) -> 'ok').
--spec(gc_done/3 :: (non_neg_integer(), file_num(), file_num()) -> 'ok').
--spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
--spec(client_init/0 :: () -> client_msstate()).
+-spec(contains/2 :: (server(), msg_id()) -> boolean()).
+-spec(remove/2 :: (server(), [msg_id()]) -> 'ok').
+-spec(release/2 :: (server(), [msg_id()]) -> 'ok').
+-spec(sync/3 :: (server(), [msg_id()], fun (() -> any())) -> 'ok').
+-spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> 'ok').
+-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
+-spec(client_init/1 :: (server()) -> client_msstate()).
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
-endif.
@@ -264,28 +276,32 @@
%% public API
%%----------------------------------------------------------------------------
-start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) ->
- gen_server2:start_link({local, ?SERVER}, ?MODULE,
- [Dir, MsgRefDeltaGen, MsgRefDeltaGenInit],
+start_link(Server, Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) ->
+ gen_server2:start_link({local, Server}, ?MODULE,
+ [Server, Dir, MsgRefDeltaGen, MsgRefDeltaGenInit],
[{timeout, infinity}]).
-write(MsgId, Msg) ->
- ok = add_to_cache(MsgId, Msg),
- gen_server2:cast(?SERVER, {write, MsgId, Msg}).
+write(Server, MsgId, Msg, CState =
+ #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ ok = add_to_cache(CurFileCacheEts, MsgId, Msg),
+ {gen_server2:cast(Server, {write, MsgId, Msg}), CState}.
-read(MsgId, CState) ->
+read(Server, MsgId, CState =
+ #client_msstate { dedup_cache_ets = DedupCacheEts,
+ cur_file_cache_ets = CurFileCacheEts }) ->
%% 1. Check the dedup cache
- case fetch_and_increment_cache(MsgId) of
+ case fetch_and_increment_cache(DedupCacheEts, MsgId) of
not_found ->
%% 2. Check the cur file cache
- case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of
+ case ets:lookup(CurFileCacheEts, MsgId) of
[] ->
Defer = fun() -> {gen_server2:pcall(
- ?SERVER, 2, {read, MsgId}, infinity),
+ Server, 2, {read, MsgId}, infinity),
CState} end,
case index_lookup(MsgId, CState) of
not_found -> Defer();
- MsgLocation -> client_read1(MsgLocation, Defer, CState)
+ MsgLocation -> client_read1(Server, MsgLocation, Defer,
+ CState)
end;
[{MsgId, Msg, _CacheRefCount}] ->
%% Although we've found it, we don't know the
@@ -296,25 +312,29 @@ read(MsgId, CState) ->
{{ok, Msg}, CState}
end.
-contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity).
-remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}).
-release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}).
-sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}).
-sync() -> gen_server2:pcast(?SERVER, 8, sync). %% internal
-
-gc_done(Reclaimed, Source, Destination) ->
- gen_server2:pcast(?SERVER, 8, {gc_done, Reclaimed, Source, Destination}).
-
-set_maximum_since_use(Age) ->
- gen_server2:pcast(?SERVER, 8, {set_maximum_since_use, Age}).
-
-client_init() ->
- {IState, IModule, Dir} =
- gen_server2:call(?SERVER, new_client_state, infinity),
- #client_msstate { file_handle_cache = dict:new(),
- index_state = IState,
- index_module = IModule,
- dir = Dir }.
+contains(Server, MsgId) -> gen_server2:call(Server, {contains, MsgId}, infinity).
+remove(Server, MsgIds) -> gen_server2:cast(Server, {remove, MsgIds}).
+release(Server, MsgIds) -> gen_server2:cast(Server, {release, MsgIds}).
+sync(Server, MsgIds, K) -> gen_server2:cast(Server, {sync, MsgIds, K}).
+sync(Server) -> gen_server2:pcast(Server, 8, sync). %% internal
+
+gc_done(Server, Reclaimed, Source, Destination) ->
+ gen_server2:pcast(Server, 8, {gc_done, Reclaimed, Source, Destination}).
+
+set_maximum_since_use(Server, Age) ->
+ gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}).
+
+client_init(Server) ->
+ {IState, IModule, Dir, FileHandlesEts, FileSummaryEts, DedupCacheEts,
+ CurFileCacheEts} = gen_server2:call(Server, new_client_state, infinity),
+ #client_msstate { file_handle_cache = dict:new(),
+ index_state = IState,
+ index_module = IModule,
+ dir = Dir,
+ file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts,
+ cur_file_cache_ets = CurFileCacheEts }.
client_terminate(CState) ->
close_all_handles(CState),
@@ -324,53 +344,58 @@ client_terminate(CState) ->
%% Client-side-only helpers
%%----------------------------------------------------------------------------
-add_to_cache(MsgId, Msg) ->
- case ets:insert_new(?CUR_FILE_CACHE_ETS_NAME, {MsgId, Msg, 1}) of
+add_to_cache(CurFileCacheEts, MsgId, Msg) ->
+ case ets:insert_new(CurFileCacheEts, {MsgId, Msg, 1}) of
true ->
ok;
false ->
try
- ets:update_counter(?CUR_FILE_CACHE_ETS_NAME, MsgId, {3, +1}),
+ ets:update_counter(CurFileCacheEts, MsgId, {3, +1}),
ok
- catch error:badarg -> add_to_cache(MsgId, Msg)
+ catch error:badarg -> add_to_cache(CurFileCacheEts, MsgId, Msg)
end
end.
-client_read1(MsgLocation = #msg_location { msg_id = MsgId, file = File }, Defer,
- CState) ->
- case ets:lookup(?FILE_SUMMARY_ETS_NAME, File) of
+client_read1(Server, #msg_location { msg_id = MsgId, file = File } =
+ MsgLocation, Defer, CState =
+ #client_msstate { file_summary_ets = FileSummaryEts }) ->
+ case ets:lookup(FileSummaryEts, File) of
[] -> %% File has been GC'd and no longer exists. Go around again.
- read(MsgId, CState);
+ read(Server, MsgId, CState);
[#file_summary { locked = Locked, right = Right }] ->
- client_read2(Locked, Right, MsgLocation, Defer, CState)
+ client_read2(Server, Locked, Right, MsgLocation, Defer, CState)
end.
-client_read2(false, undefined, #msg_location {
- msg_id = MsgId, ref_count = RefCount }, Defer, CState) ->
- case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of
+client_read2(_Server, false, undefined,
+ #msg_location { msg_id = MsgId, ref_count = RefCount }, Defer,
+ CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
+ dedup_cache_ets = DedupCacheEts }) ->
+ case ets:lookup(CurFileCacheEts, MsgId) of
[] ->
Defer(); %% may have rolled over
[{MsgId, Msg, _CacheRefCount}] ->
- ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
+ ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg),
{{ok, Msg}, CState}
end;
-client_read2(true, _Right, _MsgLocation, Defer, _CState) ->
+client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) ->
%% Of course, in the mean time, the GC could have run and our msg
%% is actually in a different file, unlocked. However, defering is
%% the safest and simplest thing to do.
Defer();
-client_read2(false, _Right, #msg_location {
- msg_id = MsgId, ref_count = RefCount, file = File },
- Defer, CState) ->
+client_read2(Server, false, _Right,
+ #msg_location { msg_id = MsgId, ref_count = RefCount, file = File },
+ Defer, CState =
+ #client_msstate { file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts }) ->
%% It's entirely possible that everything we're doing from here on
%% is for the wrong file, or a non-existent file, as a GC may have
%% finished.
- try ets:update_counter(?FILE_SUMMARY_ETS_NAME, File,
- {#file_summary.readers, +1})
+ try ets:update_counter(FileSummaryEts, File, {#file_summary.readers, +1})
catch error:badarg -> %% the File has been GC'd and deleted. Go around.
- read(MsgId, CState)
+ read(Server, MsgId, CState)
end,
- Release = fun() -> ets:update_counter(?FILE_SUMMARY_ETS_NAME, File,
+ Release = fun() -> ets:update_counter(FileSummaryEts, File,
{#file_summary.readers, -1})
end,
%% If a GC hasn't already started, it won't start now. Need to
@@ -378,7 +403,7 @@ client_read2(false, _Right, #msg_location {
%% between lookup and update_counter (thus GC started before our
%% +1).
[#file_summary { locked = Locked }] =
- ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
+ ets:lookup(FileSummaryEts, File),
case Locked of
true ->
%% If we get a badarg here, then the GC has finished and
@@ -390,7 +415,7 @@ client_read2(false, _Right, #msg_location {
%% readers, msg_store ets:deletes (and unlocks the dest)
try Release(),
Defer()
- catch error:badarg -> read(MsgId, CState)
+ catch error:badarg -> read(Server, MsgId, CState)
end;
false ->
%% Ok, we're definitely safe to continue - a GC can't
@@ -410,23 +435,25 @@ client_read2(false, _Right, #msg_location {
MsgLocation = #msg_location { file = File } ->
%% Still the same file.
%% This is fine to fail (already exists)
- ets:insert_new(
- ?FILE_HANDLES_ETS_NAME, {{self(), File}, open}),
+ ets:insert_new(FileHandlesEts, {{self(), File}, open}),
CState1 = close_all_indicated(CState),
- {Msg, CState2} = read_from_disk(MsgLocation, CState1),
- ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
+ {Msg, CState2} =
+ read_from_disk(MsgLocation, CState1, DedupCacheEts),
+ ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId,
+ Msg),
Release(), %% this MUST NOT fail with badarg
{{ok, Msg}, CState2};
MsgLocation -> %% different file!
Release(), %% this MUST NOT fail with badarg
- client_read1(MsgLocation, Defer, CState)
+ client_read1(Server, MsgLocation, Defer, CState)
end
end.
-close_all_indicated(CState) ->
- Objs = ets:match_object(?FILE_HANDLES_ETS_NAME, {{self(), '_'}, close}),
+close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } =
+ CState) ->
+ Objs = ets:match_object(FileHandlesEts, {{self(), '_'}, close}),
lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) ->
- true = ets:delete(?FILE_HANDLES_ETS_NAME, Key),
+ true = ets:delete(FileHandlesEts, Key),
close_handle(File, CStateM)
end, CState, Objs).
@@ -434,12 +461,13 @@ close_all_indicated(CState) ->
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
+init([Server, BaseDir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
process_flag(trap_exit, true),
- ok =
- file_handle_cache:register_callback(?MODULE, set_maximum_since_use, []),
+ ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
+ [self()]),
+ Dir = filename:join(BaseDir, atom_to_list(Server)),
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
{ok, IndexModule} = application:get_env(msg_store_index_module),
@@ -448,28 +476,32 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
IndexState = IndexModule:init(Dir),
InitFile = 0,
- ?FILE_SUMMARY_ETS_NAME = ets:new(?FILE_SUMMARY_ETS_NAME,
- [ordered_set, public, named_table,
- {keypos, #file_summary.file}]),
- ?CACHE_ETS_NAME = ets:new(?CACHE_ETS_NAME, [set, public, named_table]),
- ?FILE_HANDLES_ETS_NAME = ets:new(?FILE_HANDLES_ETS_NAME,
- [ordered_set, public, named_table]),
- ?CUR_FILE_CACHE_ETS_NAME = ets:new(?CUR_FILE_CACHE_ETS_NAME,
- [set, public, named_table]),
- State =
- #msstate { dir = Dir,
- index_module = IndexModule,
- index_state = IndexState,
- current_file = InitFile,
- current_file_handle = undefined,
- file_handle_cache = dict:new(),
- on_sync = [],
- sync_timer_ref = undefined,
- sum_valid_data = 0,
- sum_file_size = 0,
- pending_gc_completion = [],
- gc_active = false
- },
+ FileSummaryEts = ets:new(rabbit_msg_store_file_summary,
+ [ordered_set, public,
+ {keypos, #file_summary.file}]),
+ DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]),
+ FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
+ [ordered_set, public]),
+ CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
+
+ State = #msstate { dir = Dir,
+ index_module = IndexModule,
+ index_state = IndexState,
+ current_file = InitFile,
+ current_file_handle = undefined,
+ file_handle_cache = dict:new(),
+ on_sync = [],
+ sync_timer_ref = undefined,
+ sum_valid_data = 0,
+ sum_file_size = 0,
+ pending_gc_completion = [],
+ gc_active = false,
+ gc_pid = undefined,
+ file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts,
+ cur_file_cache_ets = CurFileCacheEts
+ },
ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
FileNames =
@@ -490,9 +522,11 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
{ok, Offset} = file_handle_cache:position(FileHdl, Offset),
ok = file_handle_cache:truncate(FileHdl),
- {ok, _Pid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule),
+ {ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule,
+ FileSummaryEts),
- {ok, State1 #msstate { current_file_handle = FileHdl }, hibernate,
+ {ok, State1 #msstate { current_file_handle = FileHdl,
+ gc_pid = GCPid }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({read, MsgId}, From, State) ->
@@ -504,16 +538,23 @@ handle_call({contains, MsgId}, From, State) ->
noreply(State1);
handle_call(new_client_state, _From,
- State = #msstate { index_state = IndexState, dir = Dir,
- index_module = IndexModule }) ->
- reply({IndexState, IndexModule, Dir}, State).
+ State = #msstate { index_state = IndexState, dir = Dir,
+ index_module = IndexModule,
+ file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts,
+ cur_file_cache_ets = CurFileCacheEts }) ->
+ reply({IndexState, IndexModule, Dir, FileHandlesEts, FileSummaryEts,
+ DedupCacheEts, CurFileCacheEts}, State).
handle_cast({write, MsgId, Msg},
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
sum_valid_data = SumValid,
- sum_file_size = SumFileSize }) ->
- true = 0 =< ets:update_counter(?CUR_FILE_CACHE_ETS_NAME, MsgId, {3, -1}),
+ sum_file_size = SumFileSize,
+ file_summary_ets = FileSummaryEts,
+ cur_file_cache_ets = CurFileCacheEts }) ->
+ true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
case index_lookup(MsgId, State) of
not_found ->
%% New message, lots to do
@@ -528,7 +569,7 @@ handle_cast({write, MsgId, Msg},
right = undefined,
locked = false,
file_size = FileSize }] =
- ets:lookup(?FILE_SUMMARY_ETS_NAME, CurFile),
+ ets:lookup(FileSummaryEts, CurFile),
ValidTotalSize1 = ValidTotalSize + TotalSize,
ContiguousTop1 = if CurOffset =:= ContiguousTop ->
%% can't be any holes in this file
@@ -536,7 +577,7 @@ handle_cast({write, MsgId, Msg},
true -> ContiguousTop
end,
true = ets:update_element(
- ?FILE_SUMMARY_ETS_NAME,
+ FileSummaryEts,
CurFile,
[{#file_summary.valid_total_size, ValidTotalSize1},
{#file_summary.contiguous_top, ContiguousTop1},
@@ -562,8 +603,10 @@ handle_cast({remove, MsgIds}, State) ->
State, MsgIds),
noreply(maybe_compact(State1));
-handle_cast({release, MsgIds}, State) ->
- lists:foreach(fun (MsgId) -> decrement_cache(MsgId) end, MsgIds),
+handle_cast({release, MsgIds}, State =
+ #msstate { dedup_cache_ets = DedupCacheEts }) ->
+ lists:foreach(
+ fun (MsgId) -> decrement_cache(DedupCacheEts, MsgId) end, MsgIds),
noreply(State);
handle_cast({sync, MsgIds, K},
@@ -582,32 +625,34 @@ handle_cast({sync, MsgIds, K},
end;
handle_cast(sync, State) ->
- noreply(sync(State));
+ noreply(internal_sync(State));
handle_cast({gc_done, Reclaimed, Source, Dest},
State = #msstate { sum_file_size = SumFileSize,
- gc_active = {Source, Dest} }) ->
+ gc_active = {Source, Dest},
+ file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts }) ->
%% GC done, so now ensure that any clients that have open fhs to
%% those files close them before using them again. This has to be
%% done here, and not when starting up the GC, because if done
%% when starting up the GC, the client could find the close, and
%% close and reopen the fh, whilst the GC is waiting for readers
%% to disappear, before it's actually done the GC.
- true = mark_handle_to_close(Source),
- true = mark_handle_to_close(Dest),
+ true = mark_handle_to_close(FileHandlesEts, Source),
+ true = mark_handle_to_close(FileHandlesEts, Dest),
%% we always move data left, so Source has gone and was on the
%% right, so need to make dest = source.right.left, and also
%% dest.right = source.right
[#file_summary { left = Dest, right = SourceRight, locked = true,
readers = 0 }] =
- ets:lookup(?FILE_SUMMARY_ETS_NAME, Source),
+ ets:lookup(FileSummaryEts, Source),
%% this could fail if SourceRight == undefined
- ets:update_element(?FILE_SUMMARY_ETS_NAME, SourceRight,
+ ets:update_element(FileSummaryEts, SourceRight,
{#file_summary.left, Dest}),
- true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Dest,
+ true = ets:update_element(FileSummaryEts, Dest,
[{#file_summary.locked, false},
{#file_summary.right, SourceRight}]),
- true = ets:delete(?FILE_SUMMARY_ETS_NAME, Source),
+ true = ets:delete(FileSummaryEts, Source),
noreply(run_pending(
State #msstate { sum_file_size = SumFileSize - Reclaimed,
gc_active = false }));
@@ -617,28 +662,33 @@ handle_cast({set_maximum_since_use, Age}, State) ->
noreply(State).
handle_info(timeout, State) ->
- noreply(sync(State));
+ noreply(internal_sync(State));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
-terminate(_Reason, State = #msstate { index_state = IndexState,
- index_module = IndexModule,
- current_file_handle = FileHdl }) ->
+terminate(_Reason, State = #msstate { index_state = IndexState,
+ index_module = IndexModule,
+ current_file_handle = FileHdl,
+ gc_pid = GCPid,
+ file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts,
+ cur_file_cache_ets = CurFileCacheEts }) ->
%% stop the gc first, otherwise it could be working and we pull
%% out the ets tables from under it.
- ok = rabbit_msg_store_gc:stop(),
+ ok = rabbit_msg_store_gc:stop(GCPid),
State1 = case FileHdl of
undefined -> State;
- _ -> State2 = sync(State),
+ _ -> State2 = internal_sync(State),
file_handle_cache:close(FileHdl),
State2
end,
State3 = close_all_handles(State1),
- ets:delete(?FILE_SUMMARY_ETS_NAME),
- ets:delete(?CACHE_ETS_NAME),
- ets:delete(?FILE_HANDLES_ETS_NAME),
- ets:delete(?CUR_FILE_CACHE_ETS_NAME),
+ ets:delete(FileSummaryEts),
+ ets:delete(DedupCacheEts),
+ ets:delete(FileHandlesEts),
+ ets:delete(CurFileCacheEts),
IndexModule:terminate(IndexState),
State3 #msstate { index_state = undefined,
current_file_handle = undefined }.
@@ -686,7 +736,7 @@ sort_file_names(FileNames) ->
lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
FileNames).
-sync(State = #msstate { current_file_handle = CurHdl,
+internal_sync(State = #msstate { current_file_handle = CurHdl,
on_sync = Syncs }) ->
State1 = stop_sync_timer(State),
case Syncs of
@@ -697,12 +747,13 @@ sync(State = #msstate { current_file_handle = CurHdl,
State1 #msstate { on_sync = [] }
end.
-read_message(MsgId, From, State) ->
+read_message(MsgId, From, State =
+ #msstate { dedup_cache_ets = DedupCacheEts }) ->
case index_lookup(MsgId, State) of
not_found -> gen_server2:reply(From, not_found),
State;
MsgLocation ->
- case fetch_and_increment_cache(MsgId) of
+ case fetch_and_increment_cache(DedupCacheEts, MsgId) of
not_found ->
read_message1(From, MsgLocation, State);
Msg ->
@@ -714,33 +765,36 @@ read_message(MsgId, From, State) ->
read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
file = File, offset = Offset } = MsgLoc,
State = #msstate { current_file = CurFile,
- current_file_handle = CurHdl }) ->
+ current_file_handle = CurHdl,
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts,
+ cur_file_cache_ets = CurFileCacheEts }) ->
case File =:= CurFile of
true ->
{Msg, State1} =
%% can return [] if msg in file existed on startup
- case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of
+ case ets:lookup(CurFileCacheEts, MsgId) of
[] ->
ok = case {ok, Offset} >=
file_handle_cache:current_raw_offset(CurHdl) of
true -> file_handle_cache:flush(CurHdl);
false -> ok
end,
- read_from_disk(MsgLoc, State);
+ read_from_disk(MsgLoc, State, DedupCacheEts);
[{MsgId, Msg1, _CacheRefCount}] ->
{Msg1, State}
end,
- ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
+ ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg),
gen_server2:reply(From, {ok, Msg}),
State1;
false ->
[#file_summary { locked = Locked }] =
- ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
+ ets:lookup(FileSummaryEts, File),
case Locked of
true ->
add_to_pending_gc_completion({read, MsgId, From}, State);
false ->
- {Msg, State1} = read_from_disk(MsgLoc, State),
+ {Msg, State1} = read_from_disk(MsgLoc, State, DedupCacheEts),
gen_server2:reply(From, {ok, Msg}),
State1
end
@@ -748,7 +802,8 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
file = File, offset = Offset,
- total_size = TotalSize }, State) ->
+ total_size = TotalSize }, State,
+ DedupCacheEts) ->
{Hdl, State1} = get_read_handle(File, State),
{ok, Offset} = file_handle_cache:position(Hdl, Offset),
{ok, {MsgId, Msg}} =
@@ -764,12 +819,13 @@ read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
{proc_dict, get()}
]}})
end,
- ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
+ ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg),
{Msg, State1}.
-maybe_insert_into_cache(RefCount, MsgId, Msg) when RefCount > 1 ->
- insert_into_cache(MsgId, Msg);
-maybe_insert_into_cache(_RefCount, _MsgId, _Msg) ->
+maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg)
+ when RefCount > 1 ->
+ insert_into_cache(DedupCacheEts, MsgId, Msg);
+maybe_insert_into_cache(_DedupCacheEts, _RefCount, _MsgId, _Msg) ->
ok.
contains_message(MsgId, From, State = #msstate { gc_active = GCActive }) ->
@@ -788,7 +844,9 @@ contains_message(MsgId, From, State = #msstate { gc_active = GCActive }) ->
end
end.
-remove_message(MsgId, State = #msstate { sum_valid_data = SumValid }) ->
+remove_message(MsgId, State = #msstate { sum_valid_data = SumValid,
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts }) ->
#msg_location { ref_count = RefCount, file = File,
offset = Offset, total_size = TotalSize } =
index_lookup(MsgId, State),
@@ -797,11 +855,11 @@ remove_message(MsgId, State = #msstate { sum_valid_data = SumValid }) ->
%% don't remove from CUR_FILE_CACHE_ETS_NAME here because
%% there may be further writes in the mailbox for the same
%% msg.
- ok = remove_cache_entry(MsgId),
+ ok = remove_cache_entry(DedupCacheEts, MsgId),
[#file_summary { valid_total_size = ValidTotalSize,
contiguous_top = ContiguousTop,
locked = Locked }] =
- ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
+ ets:lookup(FileSummaryEts, File),
case Locked of
true ->
add_to_pending_gc_completion({remove, MsgId}, State);
@@ -810,15 +868,14 @@ remove_message(MsgId, State = #msstate { sum_valid_data = SumValid }) ->
ContiguousTop1 = lists:min([ContiguousTop, Offset]),
ValidTotalSize1 = ValidTotalSize - TotalSize,
true = ets:update_element(
- ?FILE_SUMMARY_ETS_NAME,
- File,
+ FileSummaryEts, File,
[{#file_summary.valid_total_size, ValidTotalSize1},
{#file_summary.contiguous_top, ContiguousTop1}]),
State1 = delete_file_if_empty(File, State),
State1 #msstate { sum_valid_data = SumValid - TotalSize }
end;
_ when 1 < RefCount ->
- ok = decrement_cache(MsgId),
+ ok = decrement_cache(DedupCacheEts, MsgId),
%% only update field, otherwise bad interaction with concurrent GC
ok = index_update_fields(MsgId,
{#msg_location.ref_count, RefCount - 1},
@@ -857,11 +914,12 @@ close_handle(Key, FHC) ->
error -> FHC
end.
-close_all_handles(CState = #client_msstate { file_handle_cache = FHC }) ->
+close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts,
+ file_handle_cache = FHC }) ->
Self = self(),
ok = dict:fold(fun (File, Hdl, ok) ->
true =
- ets:delete(?FILE_HANDLES_ETS_NAME, {Self, File}),
+ ets:delete(FileHandlesEts, {Self, File}),
file_handle_cache:close(Hdl)
end, ok, FHC),
CState #client_msstate { file_handle_cache = dict:new() };
@@ -897,27 +955,27 @@ get_read_handle(FileNum, FHC, Dir) ->
%% message cache helper functions
%%----------------------------------------------------------------------------
-remove_cache_entry(MsgId) ->
- true = ets:delete(?CACHE_ETS_NAME, MsgId),
+remove_cache_entry(DedupCacheEts, MsgId) ->
+ true = ets:delete(DedupCacheEts, MsgId),
ok.
-fetch_and_increment_cache(MsgId) ->
- case ets:lookup(?CACHE_ETS_NAME, MsgId) of
+fetch_and_increment_cache(DedupCacheEts, MsgId) ->
+ case ets:lookup(DedupCacheEts, MsgId) of
[] ->
not_found;
[{_MsgId, Msg, _RefCount}] ->
try
- ets:update_counter(?CACHE_ETS_NAME, MsgId, {3, 1})
+ ets:update_counter(DedupCacheEts, MsgId, {3, 1})
catch error:badarg ->
%% someone has deleted us in the meantime, insert us
- ok = insert_into_cache(MsgId, Msg)
+ ok = insert_into_cache(DedupCacheEts, MsgId, Msg)
end,
Msg
end.
-decrement_cache(MsgId) ->
- true = try case ets:update_counter(?CACHE_ETS_NAME, MsgId, {3, -1}) of
- N when N =< 0 -> true = ets:delete(?CACHE_ETS_NAME, MsgId);
+decrement_cache(DedupCacheEts, MsgId) ->
+ true = try case ets:update_counter(DedupCacheEts, MsgId, {3, -1}) of
+ N when N =< 0 -> true = ets:delete(DedupCacheEts, MsgId);
_N -> true
end
catch error:badarg ->
@@ -928,14 +986,14 @@ decrement_cache(MsgId) ->
end,
ok.
-insert_into_cache(MsgId, Msg) ->
- case ets:insert_new(?CACHE_ETS_NAME, {MsgId, Msg, 1}) of
+insert_into_cache(DedupCacheEts, MsgId, Msg) ->
+ case ets:insert_new(DedupCacheEts, {MsgId, Msg, 1}) of
true -> ok;
false -> try
- ets:update_counter(?CACHE_ETS_NAME, MsgId, {3, 1}),
+ ets:update_counter(DedupCacheEts, MsgId, {3, 1}),
ok
catch error:badarg ->
- insert_into_cache(MsgId, Msg)
+ insert_into_cache(DedupCacheEts, MsgId, Msg)
end
end.
@@ -1125,16 +1183,17 @@ build_index(Files, State) ->
{Offset, State1} = build_index(undefined, Files, State),
{Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)}.
-build_index(Left, [], State) ->
+build_index(Left, [], State = #msstate { file_summary_ets = FileSummaryEts }) ->
ok = index_delete_by_file(undefined, State),
- Offset = case ets:lookup(?FILE_SUMMARY_ETS_NAME, Left) of
+ Offset = case ets:lookup(FileSummaryEts, Left) of
[] -> 0;
[#file_summary { file_size = FileSize }] -> FileSize
end,
{Offset, State #msstate { current_file = Left }};
build_index(Left, [File|Files],
State = #msstate { dir = Dir, sum_valid_data = SumValid,
- sum_file_size = SumFileSize }) ->
+ sum_file_size = SumFileSize,
+ file_summary_ets = FileSummaryEts }) ->
{ok, Messages, FileSize} =
rabbit_msg_store_misc:scan_file_for_valid_messages(
Dir, rabbit_msg_store_misc:filenum_to_name(File)),
@@ -1168,7 +1227,7 @@ build_index(Left, [File|Files],
[F|_] -> {F, FileSize}
end,
true =
- ets:insert_new(?FILE_SUMMARY_ETS_NAME, #file_summary {
+ ets:insert_new(FileSummaryEts, #file_summary {
file = File, valid_total_size = ValidTotalSize,
contiguous_top = ContiguousTop, locked = false,
left = Left, right = Right, file_size = FileSize1,
@@ -1184,61 +1243,65 @@ build_index(Left, [File|Files],
maybe_roll_to_new_file(Offset,
State = #msstate { dir = Dir,
current_file_handle = CurHdl,
- current_file = CurFile })
+ current_file = CurFile,
+ file_summary_ets = FileSummaryEts,
+ cur_file_cache_ets = CurFileCacheEts })
when Offset >= ?FILE_SIZE_LIMIT ->
- State1 = sync(State),
+ State1 = internal_sync(State),
ok = file_handle_cache:close(CurHdl),
NextFile = CurFile + 1,
{ok, NextHdl} = rabbit_msg_store_misc:open_file(
Dir, rabbit_msg_store_misc:filenum_to_name(NextFile),
?WRITE_MODE),
true = ets:insert_new(
- ?FILE_SUMMARY_ETS_NAME, #file_summary {
+ FileSummaryEts, #file_summary {
file = NextFile, valid_total_size = 0, contiguous_top = 0,
left = CurFile, right = undefined, file_size = 0,
locked = false, readers = 0 }),
- true = ets:update_element(?FILE_SUMMARY_ETS_NAME, CurFile,
+ true = ets:update_element(FileSummaryEts, CurFile,
{#file_summary.right, NextFile}),
- true = ets:match_delete(?CUR_FILE_CACHE_ETS_NAME, {'_', '_', 0}),
+ true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
State1 #msstate { current_file_handle = NextHdl,
current_file = NextFile };
maybe_roll_to_new_file(_, State) ->
State.
-maybe_compact(State = #msstate { sum_valid_data = SumValid,
- sum_file_size = SumFileSize,
- gc_active = false })
+maybe_compact(State = #msstate { sum_valid_data = SumValid,
+ sum_file_size = SumFileSize,
+ gc_active = false,
+ gc_pid = GCPid,
+ file_summary_ets = FileSummaryEts })
when (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
- First = ets:first(?FILE_SUMMARY_ETS_NAME),
+ First = ets:first(FileSummaryEts),
N = random_distributions:geometric(?GEOMETRIC_P),
- case find_files_to_gc(N, First) of
+ case find_files_to_gc(FileSummaryEts, N, First) of
undefined ->
State;
{Source, Dest} ->
State1 = close_handle(Source, close_handle(Dest, State)),
- true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Source,
+ true = ets:update_element(FileSummaryEts, Source,
{#file_summary.locked, true}),
- true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Dest,
+ true = ets:update_element(FileSummaryEts, Dest,
{#file_summary.locked, true}),
- ok = rabbit_msg_store_gc:gc(Source, Dest),
+ ok = rabbit_msg_store_gc:gc(GCPid, Source, Dest),
State1 #msstate { gc_active = {Source, Dest} }
end;
maybe_compact(State) ->
State.
-mark_handle_to_close(File) ->
- [ ets:update_element(?FILE_HANDLES_ETS_NAME, Key, {2, close})
- || {Key, open} <- ets:match_object(?FILE_HANDLES_ETS_NAME,
+mark_handle_to_close(FileHandlesEts, File) ->
+ [ ets:update_element(FileHandlesEts, Key, {2, close})
+ || {Key, open} <- ets:match_object(FileHandlesEts,
{{'_', File}, open}) ],
true.
-find_files_to_gc(_N, '$end_of_table') ->
+find_files_to_gc(_FileSummaryEts, _N, '$end_of_table') ->
undefined;
-find_files_to_gc(N, First) ->
+find_files_to_gc(FileSummaryEts, N, First) ->
[FirstObj = #file_summary { right = Right }] =
- ets:lookup(?FILE_SUMMARY_ETS_NAME, First),
- Pairs = find_files_to_gc(N, FirstObj,
- ets:lookup(?FILE_SUMMARY_ETS_NAME, Right), []),
+ ets:lookup(FileSummaryEts, First),
+ Pairs = find_files_to_gc(FileSummaryEts, N, FirstObj,
+ ets:lookup(FileSummaryEts, Right), []),
case Pairs of
[] -> undefined;
[Pair] -> Pair;
@@ -1246,9 +1309,9 @@ find_files_to_gc(N, First) ->
lists:nth(M, Pairs)
end.
-find_files_to_gc(_N, #file_summary {}, [], Pairs) ->
+find_files_to_gc(_FileSummaryEts, _N, #file_summary {}, [], Pairs) ->
lists:reverse(Pairs);
-find_files_to_gc(N,
+find_files_to_gc(FileSummaryEts, N,
#file_summary { right = Source, file = Dest,
valid_total_size = DestValid },
[SourceObj = #file_summary { left = Dest, right = SourceRight,
@@ -1259,22 +1322,24 @@ find_files_to_gc(N,
Pair = {Source, Dest},
case N == 1 of
true -> [Pair];
- false -> find_files_to_gc((N - 1), SourceObj,
- ets:lookup(?FILE_SUMMARY_ETS_NAME, SourceRight),
+ false -> find_files_to_gc(FileSummaryEts, (N - 1), SourceObj,
+ ets:lookup(FileSummaryEts, SourceRight),
[Pair | Pairs])
end;
-find_files_to_gc(N, _Left,
+find_files_to_gc(FileSummaryEts, N, _Left,
[Right = #file_summary { right = RightRight }], Pairs) ->
find_files_to_gc(
- N, Right, ets:lookup(?FILE_SUMMARY_ETS_NAME, RightRight), Pairs).
+ FileSummaryEts, N, Right, ets:lookup(FileSummaryEts, RightRight), Pairs).
delete_file_if_empty(File, State = #msstate { current_file = File }) ->
State;
delete_file_if_empty(File, State =
- #msstate { dir = Dir, sum_file_size = SumFileSize }) ->
+ #msstate { dir = Dir, sum_file_size = SumFileSize,
+ file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts }) ->
[#file_summary { valid_total_size = ValidData, file_size = FileSize,
left = Left, right = Right, locked = false }]
- = ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
+ = ets:lookup(FileSummaryEts, File),
case ValidData of
%% we should NEVER find the current file in here hence right
%% should always be a file, not undefined
@@ -1282,16 +1347,16 @@ delete_file_if_empty(File, State =
{undefined, _} when not is_atom(Right) ->
%% the eldest file is empty.
true = ets:update_element(
- ?FILE_SUMMARY_ETS_NAME, Right,
+ FileSummaryEts, Right,
{#file_summary.left, undefined});
{_, _} when not is_atom(Right) ->
- true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Right,
+ true = ets:update_element(FileSummaryEts, Right,
{#file_summary.left, Left}),
- true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Left,
+ true = ets:update_element(FileSummaryEts, Left,
{#file_summary.right, Right})
end,
- true = mark_handle_to_close(File),
- true = ets:delete(?FILE_SUMMARY_ETS_NAME, File),
+ true = mark_handle_to_close(FileHandlesEts, File),
+ true = ets:delete(FileSummaryEts, File),
State1 = close_handle(File, State),
ok = file:delete(rabbit_msg_store_misc:form_filename(
Dir,
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index a64733dfc2..9cf11af29f 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -33,7 +33,7 @@
-behaviour(gen_server2).
--export([start_link/3, gc/2, stop/0]).
+-export([start_link/4, gc/3, stop/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -41,40 +41,42 @@
-record(gcstate,
{dir,
index_state,
- index_module
+ index_module,
+ parent,
+ file_summary_ets
}).
-include("rabbit_msg_store.hrl").
--define(SERVER, ?MODULE).
-
%%----------------------------------------------------------------------------
-start_link(Dir, IndexState, IndexModule) ->
- gen_server2:start_link({local, ?SERVER}, ?MODULE,
- [Dir, IndexState, IndexModule],
- [{timeout, infinity}]).
+start_link(Dir, IndexState, IndexModule, FileSummaryEts) ->
+ gen_server2:start_link(
+ ?MODULE, [self(), Dir, IndexState, IndexModule, FileSummaryEts],
+ [{timeout, infinity}]).
-gc(Source, Destination) ->
- gen_server2:cast(?SERVER, {gc, Source, Destination}).
+gc(Server, Source, Destination) ->
+ gen_server2:cast(Server, {gc, Source, Destination}).
-stop() ->
- gen_server2:call(?SERVER, stop).
+stop(Server) ->
+ gen_server2:call(Server, stop).
%%----------------------------------------------------------------------------
-init([Dir, IndexState, IndexModule]) ->
+init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) ->
{ok, #gcstate { dir = Dir, index_state = IndexState,
- index_module = IndexModule },
+ index_module = IndexModule, parent = Parent,
+ file_summary_ets = FileSummaryEts},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
-handle_cast({gc, Source, Destination}, State) ->
- Reclaimed = adjust_meta_and_combine(Source, Destination, State),
- ok = rabbit_msg_store:gc_done(Reclaimed, Source, Destination),
+handle_cast({gc, Source, Destination}, State = #gcstate { parent = Parent }) ->
+ Reclaimed = adjust_meta_and_combine(Source, Destination,
+ State),
+ ok = rabbit_msg_store:gc_done(Parent, Reclaimed, Source, Destination),
{noreply, State, hibernate}.
handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) ->
@@ -92,18 +94,19 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-adjust_meta_and_combine(SourceFile, DestFile, State) ->
+adjust_meta_and_combine(SourceFile, DestFile, State =
+ #gcstate { file_summary_ets = FileSummaryEts }) ->
[SourceObj = #file_summary {
readers = SourceReaders,
valid_total_size = SourceValidData, left = DestFile,
file_size = SourceFileSize, locked = true }] =
- ets:lookup(?FILE_SUMMARY_ETS_NAME, SourceFile),
+ ets:lookup(FileSummaryEts, SourceFile),
[DestObj = #file_summary {
readers = DestReaders,
valid_total_size = DestValidData, right = SourceFile,
file_size = DestFileSize, locked = true }] =
- ets:lookup(?FILE_SUMMARY_ETS_NAME, DestFile),
+ ets:lookup(FileSummaryEts, DestFile),
case SourceReaders =:= 0 andalso DestReaders =:= 0 of
true ->
@@ -112,7 +115,7 @@ adjust_meta_and_combine(SourceFile, DestFile, State) ->
%% don't update dest.right, because it could be changing
%% at the same time
true = ets:update_element(
- ?FILE_SUMMARY_ETS_NAME, DestFile,
+ FileSummaryEts, DestFile,
[{#file_summary.valid_total_size, TotalValidData},
{#file_summary.contiguous_top, TotalValidData},
{#file_summary.file_size, TotalValidData}]),
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index a16efb20c0..c0c1b40bd5 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -34,7 +34,8 @@
-export([init/1, terminate/1, terminate_and_erase/1, write_published/4,
write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1,
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
- find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]).
+ find_lowest_seq_id_seg_and_next_seq_id/1,
+ start_persistent_msg_store/1]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -207,7 +208,7 @@
-spec(segment_size/0 :: () -> non_neg_integer()).
-spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
--spec(start_msg_store/1 :: ([amqqueue()]) -> 'ok').
+-spec(start_persistent_msg_store/1 :: ([amqqueue()]) -> 'ok').
-endif.
@@ -250,7 +251,8 @@ init(Name) ->
{Segment3, DCountAcc2}) ->
{Segment4, DCountDelta} =
maybe_add_to_journal(
- rabbit_msg_store:contains(MsgId),
+ rabbit_msg_store:contains(
+ ?PERSISTENT_MSG_STORE, MsgId),
CleanShutdown, Del, RelSeq, Segment3),
{Segment4, DCountAcc2 + DCountDelta}
end, {Segment1 #segment { pubs = PubCount,
@@ -379,7 +381,7 @@ find_lowest_seq_id_seg_and_next_seq_id(State) ->
end,
{LowSeqIdSeg, NextSeqId, State}.
-start_msg_store(DurableQueues) ->
+start_persistent_msg_store(DurableQueues) ->
DurableDict =
dict:from_list([ {queue_name_to_dir_name(Queue #amqqueue.name),
Queue #amqqueue.name} || Queue <- DurableQueues ]),
@@ -404,10 +406,9 @@ start_msg_store(DurableQueues) ->
{DurableAcc, [QueueDir | TransientAcc]}
end
end, {[], []}, Directories),
- MsgStoreDir = filename:join(rabbit_mnesia:dir(), "msg_store"),
- ok = rabbit_sup:start_child(rabbit_msg_store, [MsgStoreDir,
- fun queue_index_walker/1,
- DurableQueueNames]),
+ ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store,
+ [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
+ fun queue_index_walker/1, DurableQueueNames]),
lists:foreach(fun (DirName) ->
Dir = filename:join(queues_dir(), DirName),
ok = delete_queue_directory(Dir)
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 25715e6e29..2c5e51125e 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -33,7 +33,7 @@
-behaviour(supervisor).
--export([start_link/0, start_child/1, start_child/2,
+-export([start_link/0, start_child/1, start_child/2, start_child/3,
start_restartable_child/1, start_restartable_child/2]).
-export([init/1]).
@@ -49,8 +49,11 @@ start_child(Mod) ->
start_child(Mod, []).
start_child(Mod, Args) ->
+ start_child(Mod, Mod, Args).
+
+start_child(ChildId, Mod, Args) ->
{ok, _} = supervisor:start_child(?SERVER,
- {Mod, {Mod, start_link, Args},
+ {ChildId, {Mod, start_link, Args},
transient, ?MAX_WAIT, worker, [Mod]}),
ok.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 96c3f1bcf2..3ccb83b6f4 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -991,20 +991,28 @@ bad_handle_hook(_, _, _) ->
extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) ->
handle_hook(Hookname, Handler, {Args, Extra1, Extra2}).
-msg_store_dir() ->
- filename:join(rabbit_mnesia:dir(), "msg_store").
-
start_msg_store_empty() ->
start_msg_store(fun (ok) -> finished end, ok).
start_msg_store(MsgRefDeltaGen, MsgRefDeltaGenInit) ->
- rabbit_sup:start_child(rabbit_msg_store, [msg_store_dir(), MsgRefDeltaGen,
- MsgRefDeltaGenInit]).
+ ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store,
+ [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
+ MsgRefDeltaGen, MsgRefDeltaGenInit]),
+ start_transient_msg_store().
+
+start_transient_msg_store() ->
+ ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
+ [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
+ fun (ok) -> finished end, ok]).
stop_msg_store() ->
- case supervisor:terminate_child(rabbit_sup, rabbit_msg_store) of
- ok -> supervisor:delete_child(rabbit_sup, rabbit_msg_store);
- E -> E
+ case supervisor:terminate_child(rabbit_sup, ?PERSISTENT_MSG_STORE) of
+ ok -> supervisor:delete_child(rabbit_sup, ?PERSISTENT_MSG_STORE),
+ case supervisor:terminate_child(rabbit_sup, ?TRANSIENT_MSG_STORE) of
+ ok -> supervisor:delete_child(rabbit_sup, ?TRANSIENT_MSG_STORE);
+ E -> {transient, E}
+ end;
+ E -> {persistent, E}
end.
msg_id_bin(X) ->
@@ -1012,13 +1020,14 @@ msg_id_bin(X) ->
msg_store_contains(Atom, MsgIds) ->
Atom = lists:foldl(
- fun (MsgId, Atom1) when Atom1 =:= Atom ->
- rabbit_msg_store:contains(MsgId) end, Atom, MsgIds).
+ fun (MsgId, Atom1) when Atom1 =:= Atom ->
+ rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, MsgId) end,
+ Atom, MsgIds).
msg_store_sync(MsgIds) ->
Ref = make_ref(),
Self = self(),
- ok = rabbit_msg_store:sync(MsgIds,
+ ok = rabbit_msg_store:sync(?PERSISTENT_MSG_STORE, MsgIds,
fun () -> Self ! {sync, Ref} end),
receive
{sync, Ref} -> ok
@@ -1031,15 +1040,17 @@ msg_store_sync(MsgIds) ->
msg_store_read(MsgIds, MSCState) ->
lists:foldl(
fun (MsgId, MSCStateM) ->
- {{ok, MsgId}, MSCStateN} = rabbit_msg_store:read(MsgId, MSCStateM),
+ {{ok, MsgId}, MSCStateN} = rabbit_msg_store:read(
+ ?PERSISTENT_MSG_STORE, MsgId, MSCStateM),
MSCStateN
end,
MSCState, MsgIds).
-msg_store_write(MsgIds) ->
- ok = lists:foldl(
- fun (MsgId, ok) -> rabbit_msg_store:write(MsgId, MsgId) end,
- ok, MsgIds).
+msg_store_write(MsgIds, MSCState) ->
+ lists:foldl(
+ fun (MsgId, {ok, MSCStateN}) ->
+ rabbit_msg_store:write(?PERSISTENT_MSG_STORE, MsgId, MsgId, MSCStateN) end,
+ {ok, MSCState}, MsgIds).
test_msg_store() ->
stop_msg_store(),
@@ -1049,25 +1060,27 @@ test_msg_store() ->
{MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds),
%% check we don't contain any of the msgs we're about to publish
false = msg_store_contains(false, MsgIds),
+ MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE),
%% publish the first half
- ok = msg_store_write(MsgIds1stHalf),
+ {ok, MSCState1} = msg_store_write(MsgIds1stHalf, MSCState),
%% sync on the first half
ok = msg_store_sync(MsgIds1stHalf),
%% publish the second half
- ok = msg_store_write(MsgIds2ndHalf),
+ {ok, MSCState2} = msg_store_write(MsgIds2ndHalf, MSCState1),
%% sync on the first half again - the msg_store will be dirty, but
%% we won't need the fsync
ok = msg_store_sync(MsgIds1stHalf),
%% check they're all in there
true = msg_store_contains(true, MsgIds),
%% publish the latter half twice so we hit the caching and ref count code
- ok = msg_store_write(MsgIds2ndHalf),
+ {ok, MSCState3} = msg_store_write(MsgIds2ndHalf, MSCState2),
%% check they're still all in there
true = msg_store_contains(true, MsgIds),
%% sync on the 2nd half, but do lots of individual syncs to try
%% and cause coalescing to happen
ok = lists:foldl(
fun (MsgId, ok) -> rabbit_msg_store:sync(
+ ?PERSISTENT_MSG_STORE,
[MsgId], fun () -> Self ! {sync, MsgId} end)
end, ok, MsgIds2ndHalf),
lists:foldl(
@@ -1085,23 +1098,22 @@ test_msg_store() ->
%% should hit a different code path
ok = msg_store_sync(MsgIds1stHalf),
%% read them all
- MSCState = rabbit_msg_store:client_init(),
- MSCState1 = msg_store_read(MsgIds, MSCState),
+ MSCState4 = msg_store_read(MsgIds, MSCState3),
%% read them all again - this will hit the cache, not disk
- MSCState2 = msg_store_read(MsgIds, MSCState1),
+ MSCState5 = msg_store_read(MsgIds, MSCState4),
%% remove them all
- ok = rabbit_msg_store:remove(MsgIds),
+ ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, MsgIds),
%% check first half doesn't exist
false = msg_store_contains(false, MsgIds1stHalf),
%% check second half does exist
true = msg_store_contains(true, MsgIds2ndHalf),
%% read the second half again
- MSCState3 = msg_store_read(MsgIds2ndHalf, MSCState2),
+ MSCState6 = msg_store_read(MsgIds2ndHalf, MSCState5),
%% release the second half, just for fun (aka code coverage)
- ok = rabbit_msg_store:release(MsgIds2ndHalf),
+ ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, MsgIds2ndHalf),
%% read the second half again, just for fun (aka code coverage)
- MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3),
- ok = rabbit_msg_store:client_terminate(MSCState4),
+ MSCState7 = msg_store_read(MsgIds2ndHalf, MSCState6),
+ ok = rabbit_msg_store:client_terminate(MSCState7),
%% stop and restart, preserving every other msg in 2nd half
ok = stop_msg_store(),
ok = start_msg_store(fun ([]) -> finished;
@@ -1114,7 +1126,7 @@ test_msg_store() ->
%% check we have the right msgs left
lists:foldl(
fun (MsgId, Bool) ->
- not(Bool = rabbit_msg_store:contains(MsgId))
+ not(Bool = rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, MsgId))
end, false, MsgIds2ndHalf),
%% restart empty
ok = stop_msg_store(),
@@ -1122,11 +1134,12 @@ test_msg_store() ->
%% check we don't contain any of the msgs
false = msg_store_contains(false, MsgIds),
%% publish the first half again
- ok = msg_store_write(MsgIds1stHalf),
+ MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE),
+ {ok, MSCState9} = msg_store_write(MsgIds1stHalf, MSCState8),
%% this should force some sort of sync internally otherwise misread
ok = rabbit_msg_store:client_terminate(
- msg_store_read(MsgIds1stHalf, rabbit_msg_store:client_init())),
- ok = rabbit_msg_store:remove(MsgIds1stHalf),
+ msg_store_read(MsgIds1stHalf, MSCState9)),
+ ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, MsgIds1stHalf),
%% restart empty
ok = stop_msg_store(),
ok = start_msg_store_empty(), %% now safe to reuse msg_ids
@@ -1134,34 +1147,37 @@ test_msg_store() ->
BigCount = 100000,
MsgIdsBig = [msg_id_bin(X) || X <- lists:seq(1, BigCount)],
Payload = << 0:65536 >>,
- ok = lists:foldl(
- fun (MsgId, ok) ->
- rabbit_msg_store:write(MsgId, Payload)
- end, ok, MsgIdsBig),
+ ok = rabbit_msg_store:client_terminate(
+ lists:foldl(
+ fun (MsgId, MSCStateN) ->
+ {ok, MSCStateM} =
+ rabbit_msg_store:write(?PERSISTENT_MSG_STORE, MsgId, Payload, MSCStateN),
+ MSCStateM
+ end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE), MsgIdsBig)),
%% now read them to ensure we hit the fast client-side reading
ok = rabbit_msg_store:client_terminate(
lists:foldl(
fun (MsgId, MSCStateM) ->
{{ok, Payload}, MSCStateN} =
- rabbit_msg_store:read(MsgId, MSCStateM),
+ rabbit_msg_store:read(?PERSISTENT_MSG_STORE, MsgId, MSCStateM),
MSCStateN
- end, rabbit_msg_store:client_init(), MsgIdsBig)),
+ end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE), MsgIdsBig)),
%% .., then 3s by 1...
ok = lists:foldl(
fun (MsgId, ok) ->
- rabbit_msg_store:remove([msg_id_bin(MsgId)])
+ rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [msg_id_bin(MsgId)])
end, ok, lists:seq(BigCount, 1, -3)),
%% .., then remove 3s by 2, from the young end first. This hits
%% GC (under 50% good data left, but no empty files. Must GC).
ok = lists:foldl(
fun (MsgId, ok) ->
- rabbit_msg_store:remove([msg_id_bin(MsgId)])
+ rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [msg_id_bin(MsgId)])
end, ok, lists:seq(BigCount-1, 1, -3)),
%% .., then remove 3s by 3, from the young end first. This hits
%% GC...
ok = lists:foldl(
fun (MsgId, ok) ->
- rabbit_msg_store:remove([msg_id_bin(MsgId)])
+ rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [msg_id_bin(MsgId)])
end, ok, lists:seq(BigCount-2, 1, -3)),
%% ensure empty
false = msg_store_contains(false, MsgIdsBig),
@@ -1184,20 +1200,25 @@ test_amqqueue(Durable) ->
pid = none}.
empty_test_queue() ->
- ok = rabbit_queue_index:start_msg_store([]),
+ ok = start_transient_msg_store(),
+ ok = rabbit_queue_index:start_persistent_msg_store([]),
{0, Qi1} = rabbit_queue_index:init(test_queue()),
_Qi2 = rabbit_queue_index:terminate_and_erase(Qi1),
ok.
queue_index_publish(SeqIds, Persistent, Qi) ->
- lists:foldl(
- fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) ->
- MsgId = rabbit_guid:guid(),
- QiM = rabbit_queue_index:write_published(MsgId, SeqId, Persistent,
- QiN),
- ok = rabbit_msg_store:write(MsgId, MsgId),
- {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]}
- end, {Qi, []}, SeqIds).
+ {A, B, MSCStateEnd} =
+ lists:foldl(
+ fun (SeqId, {QiN, SeqIdsMsgIdsAcc, MSCStateN}) ->
+ MsgId = rabbit_guid:guid(),
+ QiM = rabbit_queue_index:write_published(MsgId, SeqId, Persistent,
+ QiN),
+ {ok, MSCStateM} = rabbit_msg_store:write(?PERSISTENT_MSG_STORE, MsgId,
+ MsgId, MSCStateN),
+ {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc], MSCStateM}
+ end, {Qi, [], rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE)}, SeqIds),
+ ok = rabbit_msg_store:client_terminate(MSCStateEnd),
+ {A, B}.
queue_index_deliver(SeqIds, Qi) ->
lists:foldl(
@@ -1236,7 +1257,8 @@ test_queue_index() ->
%% call terminate twice to prove it's idempotent
_Qi5 = rabbit_queue_index:terminate(rabbit_queue_index:terminate(Qi4)),
ok = stop_msg_store(),
- ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]),
+ ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]),
+ ok = start_transient_msg_store(),
%% should get length back as 0, as all the msgs were transient
{0, Qi6} = rabbit_queue_index:init(test_queue()),
{0, SegSize, Qi7} =
@@ -1249,7 +1271,8 @@ test_queue_index() ->
lists:reverse(SeqIdsMsgIdsB)),
_Qi11 = rabbit_queue_index:terminate(Qi10),
ok = stop_msg_store(),
- ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]),
+ ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]),
+ ok = start_transient_msg_store(),
%% should get length back as 10000
LenB = length(SeqIdsB),
{LenB, Qi12} = rabbit_queue_index:init(test_queue()),
@@ -1266,7 +1289,8 @@ test_queue_index() ->
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17),
_Qi19 = rabbit_queue_index:terminate(Qi18),
ok = stop_msg_store(),
- ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]),
+ ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]),
+ ok = start_transient_msg_store(),
%% should get length back as 0 because all persistent msgs have been acked
{0, Qi20} = rabbit_queue_index:init(test_queue()),
_Qi21 = rabbit_queue_index:terminate_and_erase(Qi20),
@@ -1307,7 +1331,8 @@ test_queue_index() ->
Qi40 = queue_index_flush_journal(Qi39),
_Qi41 = rabbit_queue_index:terminate_and_erase(Qi40),
ok = stop_msg_store(),
- ok = rabbit_queue_index:start_msg_store([]),
+ ok = rabbit_queue_index:start_persistent_msg_store([]),
+ ok = start_transient_msg_store(),
ok = stop_msg_store(),
passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0043bb5ff7..b9714f535b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -154,7 +154,7 @@
rate_timestamp,
len,
on_sync,
- msg_store_read_state
+ msg_store_clients
}).
-include("rabbit.hrl").
@@ -186,7 +186,7 @@
-type(bpqueue() :: any()).
-type(msg_id() :: binary()).
-type(seq_id() :: non_neg_integer()).
--type(ack() :: {'ack_index_and_store', msg_id(), seq_id()}
+-type(ack() :: {'ack_index_and_store', msg_id(), seq_id(), boolean()}
| 'ack_not_on_disk').
-type(vqstate() :: #vqstate {
q1 :: queue(),
@@ -210,7 +210,7 @@
rate_timestamp :: {integer(), integer(), integer()},
len :: non_neg_integer(),
on_sync :: {[ack()], [msg_id()], [{pid(), any()}]},
- msg_store_read_state :: any()
+ msg_store_clients :: {any(), any()}
}).
-spec(init/1 :: (queue_name()) -> vqstate()).
@@ -285,13 +285,15 @@ init(QueueName) ->
rate_timestamp = Now,
len = DeltaCount,
on_sync = {[], [], []},
- msg_store_read_state = rabbit_msg_store:client_init()
+ msg_store_clients = {rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE),
+ rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE)}
},
maybe_deltas_to_betas(State).
terminate(State = #vqstate { index_state = IndexState,
- msg_store_read_state = MSCState }) ->
- rabbit_msg_store:client_terminate(MSCState),
+ msg_store_clients = {MSCStateP, MSCStateT} }) ->
+ rabbit_msg_store:client_terminate(MSCStateP),
+ rabbit_msg_store:client_terminate(MSCStateT),
State #vqstate { index_state = rabbit_queue_index:terminate(IndexState) }.
publish(Msg, State) ->
@@ -303,22 +305,24 @@ publish_delivered(Msg = #basic_message { guid = MsgId,
State = #vqstate { len = 0, index_state = IndexState,
next_seq_id = SeqId,
out_counter = OutCount,
- in_counter = InCount}) ->
+ in_counter = InCount,
+ msg_store_clients = MSCState }) ->
State1 = State #vqstate { out_counter = OutCount + 1,
in_counter = InCount + 1 },
MsgStatus = #msg_status {
msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent,
is_delivered = true, msg_on_disk = false, index_on_disk = false },
- MsgStatus1 = maybe_write_msg_to_disk(false, MsgStatus),
+ {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(false, MsgStatus, MSCState),
+ State2 = State1 #vqstate { msg_store_clients = MSCState1 },
case MsgStatus1 #msg_status.msg_on_disk of
true ->
{#msg_status { index_on_disk = true }, IndexState1} =
maybe_write_index_to_disk(false, MsgStatus1, IndexState),
- {{ack_index_and_store, MsgId, SeqId},
- State1 #vqstate { index_state = IndexState1,
+ {{ack_index_and_store, MsgId, SeqId, IsPersistent},
+ State2 #vqstate { index_state = IndexState1,
next_seq_id = SeqId + 1 }};
false ->
- {ack_not_on_disk, State1}
+ {ack_not_on_disk, State2}
end.
set_queue_ram_duration_target(
@@ -404,9 +408,9 @@ fetch(State =
case IndexOnDisk1 of
true -> true = IsPersistent, %% ASSERTION
true = MsgOnDisk, %% ASSERTION
- {ack_index_and_store, MsgId, SeqId};
+ {ack_index_and_store, MsgId, SeqId, IsPersistent};
false -> ok = case MsgOnDisk andalso not IsPersistent of
- true -> rabbit_msg_store:remove([MsgId]);
+ true -> rabbit_msg_store:remove(find_msg_store(IsPersistent), [MsgId]);
false -> ok
end,
ack_not_on_disk
@@ -419,19 +423,25 @@ fetch(State =
end.
ack(AckTags, State = #vqstate { index_state = IndexState }) ->
- {MsgIds, SeqIds} =
+ {MsgIdsPersistent, MsgIdsTransient, SeqIds} =
lists:foldl(
fun (ack_not_on_disk, Acc) -> Acc;
- ({ack_index_and_store, MsgId, SeqId}, {MsgIds, SeqIds}) ->
- {[MsgId | MsgIds], [SeqId | SeqIds]}
- end, {[], []}, AckTags),
+ ({ack_index_and_store, MsgId, SeqId, true}, {MsgIdsP, MsgIdsT, SeqIds}) ->
+ {[MsgId | MsgIdsP], MsgIdsT, [SeqId | SeqIds]};
+ ({ack_index_and_store, MsgId, SeqId, false}, {MsgIdsP, MsgIdsT, SeqIds}) ->
+ {MsgIdsP, [MsgId | MsgIdsT], [SeqId | SeqIds]}
+ end, {[], [], []}, AckTags),
IndexState1 = case SeqIds of
[] -> IndexState;
_ -> rabbit_queue_index:write_acks(SeqIds, IndexState)
end,
- ok = case MsgIds of
+ ok = case MsgIdsPersistent of
[] -> ok;
- _ -> rabbit_msg_store:remove(MsgIds)
+ _ -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, MsgIdsPersistent)
+ end,
+ ok = case MsgIdsTransient of
+ [] -> ok;
+ _ -> rabbit_msg_store:remove(?TRANSIENT_MSG_STORE, MsgIdsTransient)
end,
State #vqstate { index_state = IndexState1 }.
@@ -453,7 +463,7 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
%% needs to delete everything that's been delivered and not ack'd.
delete_and_terminate(State) ->
{_PurgeCount, State1 = #vqstate { index_state = IndexState,
- msg_store_read_state = MSCState }} =
+ msg_store_clients = {MSCStateP, MSCStateT} }} =
purge(State),
IndexState1 =
case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(
@@ -466,7 +476,8 @@ delete_and_terminate(State) ->
IndexState3
end,
IndexState4 = rabbit_queue_index:terminate_and_erase(IndexState1),
- rabbit_msg_store:client_terminate(MSCState),
+ rabbit_msg_store:client_terminate(MSCStateP),
+ rabbit_msg_store:client_terminate(MSCStateT),
State1 #vqstate { index_state = IndexState4 }.
%% [{Msg, AckTag}]
@@ -479,45 +490,52 @@ delete_and_terminate(State) ->
%% msg_store:release so that the cache isn't held full of msgs which
%% are now at the tail of the queue.
requeue(MsgsWithAckTags, State) ->
- {SeqIds, MsgIds, State1 = #vqstate { index_state = IndexState }} =
+ {SeqIds, MsgIdsPersistent, MsgIdsTransient,
+ State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun ({Msg = #basic_message { guid = MsgId }, AckTag},
- {SeqIdsAcc, MsgIdsAcc, StateN}) ->
- {SeqIdsAcc1, MsgIdsAcc1, MsgOnDisk} =
+ {SeqIdsAcc, MsgIdsP, MsgIdsT, StateN}) ->
+ {SeqIdsAcc1, MsgIdsP1, MsgIdsT1, MsgOnDisk} =
case AckTag of
ack_not_on_disk ->
- {SeqIdsAcc, MsgIdsAcc, false};
- {ack_index_and_store, MsgId, SeqId} ->
- {[SeqId | SeqIdsAcc], [MsgId | MsgIdsAcc], true}
+ {SeqIdsAcc, MsgIdsP, MsgIdsT, false};
+ {ack_index_and_store, MsgId, SeqId, true} ->
+ {[SeqId | SeqIdsAcc], [MsgId | MsgIdsP], MsgIdsT, true};
+ {ack_index_and_store, MsgId, SeqId, false} ->
+ {[SeqId | SeqIdsAcc], MsgIdsP, [MsgId | MsgIdsT], true}
end,
{_SeqId, StateN1} = publish(Msg, true, MsgOnDisk, StateN),
- {SeqIdsAcc1, MsgIdsAcc1, StateN1}
- end, {[], [], State}, MsgsWithAckTags),
+ {SeqIdsAcc1, MsgIdsP1, MsgIdsT1, StateN1}
+ end, {[], [], [], State}, MsgsWithAckTags),
IndexState1 = case SeqIds of
[] -> IndexState;
_ -> rabbit_queue_index:write_acks(SeqIds, IndexState)
end,
- ok = case MsgIds of
+ ok = case MsgIdsPersistent of
+ [] -> ok;
+ _ -> rabbit_msg_store:release(?PERSISTENT_MSG_STORE, MsgIdsPersistent)
+ end,
+ ok = case MsgIdsTransient of
[] -> ok;
- _ -> rabbit_msg_store:release(MsgIds)
+ _ -> rabbit_msg_store:release(?TRANSIENT_MSG_STORE, MsgIdsTransient)
end,
State1 #vqstate { index_state = IndexState1 }.
tx_publish(Msg = #basic_message { is_persistent = true, guid = MsgId },
- State) ->
+ State = #vqstate { msg_store_clients = MSCState }) ->
MsgStatus = #msg_status {
msg = Msg, msg_id = MsgId, seq_id = undefined, is_persistent = true,
is_delivered = false, msg_on_disk = false, index_on_disk = false },
- #msg_status { msg_on_disk = true } =
- maybe_write_msg_to_disk(false, MsgStatus),
- State;
+ {#msg_status { msg_on_disk = true }, MSCState1} =
+ maybe_write_msg_to_disk(false, MsgStatus, MSCState),
+ State #vqstate { msg_store_clients = MSCState1 };
tx_publish(_Msg, State) ->
State.
tx_rollback(Pubs, State) ->
ok = case persistent_msg_ids(Pubs) of
[] -> ok;
- PP -> rabbit_msg_store:remove(PP)
+ PP -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, PP)
end,
State.
@@ -528,6 +546,7 @@ tx_commit(Pubs, AckTags, From, State) ->
PersistentMsgIds ->
Self = self(),
ok = rabbit_msg_store:sync(
+ ?PERSISTENT_MSG_STORE,
PersistentMsgIds,
fun () -> ok = rabbit_amqqueue:tx_commit_msg_store_callback(
Self, Pubs, AckTags, From)
@@ -712,11 +731,15 @@ purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) ->
end.
remove_queue_entries(Fold, Q, IndexState) ->
- {Count, MsgIds, SeqIds, IndexState1} =
- Fold(fun remove_queue_entries1/2, {0, [], [], IndexState}, Q),
- ok = case MsgIds of
+ {Count, MsgIdsPersistent, MsgIdsTransient, SeqIds, IndexState1} =
+ Fold(fun remove_queue_entries1/2, {0, [], [], [], IndexState}, Q),
+ ok = case MsgIdsPersistent of
+ [] -> ok;
+ _ -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, MsgIdsPersistent)
+ end,
+ ok = case MsgIdsTransient of
[] -> ok;
- _ -> rabbit_msg_store:remove(MsgIds)
+ _ -> rabbit_msg_store:remove(?TRANSIENT_MSG_STORE, MsgIdsTransient)
end,
IndexState2 =
case SeqIds of
@@ -728,12 +751,14 @@ remove_queue_entries(Fold, Q, IndexState) ->
remove_queue_entries1(
#msg_status { msg_id = MsgId, seq_id = SeqId,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk },
- {CountN, MsgIdsAcc, SeqIdsAcc, IndexStateN}) ->
- MsgIdsAcc1 = case MsgOnDisk of
- true -> [MsgId | MsgIdsAcc];
- false -> MsgIdsAcc
- end,
+ index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
+ {CountN, MsgIdsP, MsgIdsT, SeqIdsAcc, IndexStateN}) ->
+ {MsgIdsP1, MsgIdsT1} =
+ case {MsgOnDisk, IsPersistent} of
+ {true, true} -> {[MsgId | MsgIdsP], MsgIdsT};
+ {true, false} -> {MsgIdsP, [MsgId | MsgIdsT]};
+ {false, _} -> {MsgIdsP, MsgIdsT}
+ end,
SeqIdsAcc1 = case IndexOnDisk of
true -> [SeqId | SeqIdsAcc];
false -> SeqIdsAcc
@@ -743,13 +768,13 @@ remove_queue_entries1(
SeqId, IndexStateN);
false -> IndexStateN
end,
- {CountN + 1, MsgIdsAcc1, SeqIdsAcc1, IndexStateN1}.
+ {CountN + 1, MsgIdsP1, MsgIdsT1, SeqIdsAcc1, IndexStateN1}.
fetch_from_q3_or_delta(State = #vqstate {
q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount },
q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount,
- msg_store_read_state = MSCState }) ->
+ msg_store_clients = MSCState }) ->
case bpqueue:out(Q3) of
{empty, _Q3} ->
0 = DeltaCount, %% ASSERTION
@@ -761,7 +786,7 @@ fetch_from_q3_or_delta(State = #vqstate {
is_persistent = IsPersistent }}, Q3a} ->
{{ok, Msg = #basic_message { is_persistent = IsPersistent,
guid = MsgId }}, MSCState1} =
- rabbit_msg_store:read(MsgId, MSCState),
+ read_from_msg_store(MSCState, MsgId, IsPersistent),
Q4a = queue:in(MsgStatus #msg_status { msg = Msg }, Q4),
RamIndexCount1 = case IndexOnDisk of
true -> RamIndexCount;
@@ -771,7 +796,7 @@ fetch_from_q3_or_delta(State = #vqstate {
State1 = State #vqstate { q3 = Q3a, q4 = Q4a,
ram_msg_count = RamMsgCount + 1,
ram_index_count = RamIndexCount1,
- msg_store_read_state = MSCState1 },
+ msg_store_clients = MSCState1 },
State2 =
case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of
{true, true} ->
@@ -857,19 +882,22 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId },
in_counter = InCount + 1 })}.
publish(msg, MsgStatus, State = #vqstate { index_state = IndexState,
- ram_msg_count = RamMsgCount }) ->
- MsgStatus1 = maybe_write_msg_to_disk(false, MsgStatus),
+ ram_msg_count = RamMsgCount,
+ msg_store_clients = MSCState }) ->
+ {MsgStatus1, MSCState1} =
+ maybe_write_msg_to_disk(false, MsgStatus, MSCState),
{MsgStatus2, IndexState1} =
maybe_write_index_to_disk(false, MsgStatus1, IndexState),
State1 = State #vqstate { ram_msg_count = RamMsgCount + 1,
- index_state = IndexState1 },
+ index_state = IndexState1,
+ msg_store_clients = MSCState1 },
store_alpha_entry(MsgStatus2, State1);
-publish(index, MsgStatus, State =
- #vqstate { index_state = IndexState, q1 = Q1,
- ram_index_count = RamIndexCount }) ->
- MsgStatus1 = #msg_status { msg_on_disk = true } =
- maybe_write_msg_to_disk(true, MsgStatus),
+publish(index, MsgStatus, State = #vqstate { index_state = IndexState, q1 = Q1,
+ ram_index_count = RamIndexCount,
+ msg_store_clients = MSCState }) ->
+ {MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} =
+ maybe_write_msg_to_disk(true, MsgStatus, MSCState),
ForceIndex = should_force_index_to_disk(State),
{MsgStatus2, IndexState1} =
maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
@@ -878,15 +906,16 @@ publish(index, MsgStatus, State =
false -> RamIndexCount + 1
end,
State1 = State #vqstate { index_state = IndexState1,
- ram_index_count = RamIndexCount1 },
+ ram_index_count = RamIndexCount1,
+ msg_store_clients = MSCState1 },
true = queue:is_empty(Q1), %% ASSERTION
store_beta_entry(MsgStatus2, State1);
publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State =
- #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2,
- delta = Delta }) ->
- MsgStatus1 = #msg_status { msg_on_disk = true } =
- maybe_write_msg_to_disk(true, MsgStatus),
+ #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2,
+ delta = Delta, msg_store_clients = MSCState }) ->
+ {MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} =
+ maybe_write_msg_to_disk(true, MsgStatus, MSCState),
{#msg_status { index_on_disk = true }, IndexState1} =
maybe_write_index_to_disk(true, MsgStatus1, IndexState),
true = queue:is_empty(Q1) andalso bpqueue:is_empty(Q2), %% ASSERTION
@@ -898,7 +927,8 @@ publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State =
Delta1 = #delta { start_seq_id = DeltaSeqId, count = 1,
end_seq_id = SeqId + 1 },
State #vqstate { index_state = IndexState1,
- delta = combine_deltas(Delta, Delta1) }.
+ delta = combine_deltas(Delta, Delta1),
+ msg_store_clients = MSCState1 }.
store_alpha_entry(MsgStatus, State =
#vqstate { q1 = Q1, q2 = Q2,
@@ -925,17 +955,42 @@ store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true,
State #vqstate { q2 = bpqueue:in(IndexOnDisk, MsgStatus1, Q2) }
end.
+find_msg_store(true) -> ?PERSISTENT_MSG_STORE;
+find_msg_store(false) -> ?TRANSIENT_MSG_STORE.
+
+read_from_msg_store({MSCStateP, MSCStateT}, MsgId, true) ->
+ {Res, MSCStateP1} =
+ rabbit_msg_store:read(?PERSISTENT_MSG_STORE, MsgId, MSCStateP),
+ {Res, {MSCStateP1, MSCStateT}};
+read_from_msg_store({MSCStateP, MSCStateT}, MsgId, false) ->
+ {Res, MSCStateT1} =
+ rabbit_msg_store:read(?TRANSIENT_MSG_STORE, MsgId, MSCStateT),
+ {Res, {MSCStateP, MSCStateT1}}.
+
maybe_write_msg_to_disk(_Force, MsgStatus =
- #msg_status { msg_on_disk = true }) ->
- MsgStatus;
+ #msg_status { msg_on_disk = true }, MSCState) ->
+ {MsgStatus, MSCState};
maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
msg = Msg, msg_id = MsgId,
- is_persistent = IsPersistent })
+ is_persistent = IsPersistent },
+ {MSCStateP, MSCStateT})
when Force orelse IsPersistent ->
- ok = rabbit_msg_store:write(MsgId, ensure_binary_properties(Msg)),
- MsgStatus #msg_status { msg_on_disk = true };
-maybe_write_msg_to_disk(_Force, MsgStatus) ->
- MsgStatus.
+ MSCState1 =
+ case IsPersistent of
+ true ->
+ {ok, MSCStateP1} = rabbit_msg_store:write(
+ ?PERSISTENT_MSG_STORE, MsgId,
+ ensure_binary_properties(Msg), MSCStateP),
+ {MSCStateP1, MSCStateT};
+ false ->
+ {ok, MSCStateT1} = rabbit_msg_store:write(
+ ?TRANSIENT_MSG_STORE, MsgId,
+ ensure_binary_properties(Msg), MSCStateT),
+ {MSCStateP, MSCStateT1}
+ end,
+ {MsgStatus #msg_status { msg_on_disk = true }, MSCState1};
+maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) ->
+ {MsgStatus, MSCState}.
maybe_write_index_to_disk(_Force, MsgStatus =
#msg_status { index_on_disk = true }, IndexState) ->
@@ -1082,11 +1137,12 @@ maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, State =
maybe_push_alphas_to_betas(
Generator, Consumer, Q, State =
#vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount,
- index_state = IndexState }) ->
+ index_state = IndexState, msg_store_clients = MSCState }) ->
case Generator(Q) of
{empty, _Q} -> State;
{{value, MsgStatus}, Qa} ->
- MsgStatus1 = maybe_write_msg_to_disk(true, MsgStatus),
+ {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(true, MsgStatus,
+ MSCState),
ForceIndex = should_force_index_to_disk(State),
{MsgStatus2, IndexState1} =
maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
@@ -1096,7 +1152,8 @@ maybe_push_alphas_to_betas(
end,
State1 = State #vqstate { ram_msg_count = RamMsgCount - 1,
ram_index_count = RamIndexCount1,
- index_state = IndexState1 },
+ index_state = IndexState1,
+ msg_store_clients = MSCState1 },
maybe_push_alphas_to_betas(Generator, Consumer, Qa,
Consumer(MsgStatus2, Qa, State1))
end.