diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-01 19:08:04 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-01 19:08:04 +0100 |
| commit | 72519fca997e48195c1ef56e26597fa3395c99d1 (patch) | |
| tree | c1eb0a97ece1edc59ce5d5388d66530fc3c93cba | |
| parent | ae47f65d16cb229e4298aea435e543f1ccd55f16 (diff) | |
| download | rabbitmq-server-git-72519fca997e48195c1ef56e26597fa3395c99d1.tar.gz | |
Split msg_store into two msg stores, one for persistent and one for transient. This is the first step in trying to make startup and recovery of data on disk much faster.
| -rw-r--r-- | include/rabbit.hrl | 3 | ||||
| -rw-r--r-- | include/rabbit_msg_store.hrl | 5 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 489 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_sup.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 131 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 205 |
9 files changed, 532 insertions, 378 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index df282029e5..e9fa6e376c 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -177,6 +177,9 @@ -define(MAX_WAIT, 16#ffffffff). +-define(PERSISTENT_MSG_STORE, msg_store_persistent). +-define(TRANSIENT_MSG_STORE, msg_store_transient). + -ifdef(debug). -define(LOGDEBUG0(F), rabbit_log:debug(F)). -define(LOGDEBUG(F,A), rabbit_log:debug(F,A)). diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl index 6f557c1835..2c2735d483 100644 --- a/include/rabbit_msg_store.hrl +++ b/include/rabbit_msg_store.hrl @@ -49,8 +49,3 @@ -define(FILE_SIZE_LIMIT, (16*1024*1024)). -define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB - --define(FILE_SUMMARY_ETS_NAME, rabbit_msg_store_file_summary). --define(CACHE_ETS_NAME, rabbit_msg_store_cache). --define(FILE_HANDLES_ETS_NAME, rabbit_msg_store_file_handles). --define(CUR_FILE_CACHE_ETS_NAME, rabbit_msg_store_cur_file). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9899354022..f0540c93a2 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -65,8 +65,7 @@ -type(qfun(A) :: fun ((amqqueue()) -> A)). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). --type(seq_id() :: non_neg_integer()). --type(acktag() :: ('ack_not_on_disk' | {'ack_index_and_store', msg_id(), seq_id()})). +-type(acktag() :: any()). -spec(start/0 :: () -> 'ok'). -spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) -> @@ -129,8 +128,11 @@ %%---------------------------------------------------------------------------- start() -> + ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store, + [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), + fun (ok) -> finished end, ok]), DurableQueues = find_durable_queues(), - ok = rabbit_queue_index:start_msg_store(DurableQueues), + ok = rabbit_queue_index:start_persistent_msg_store(DurableQueues), {ok,_} = supervisor:start_child( rabbit_sup, {rabbit_amqqueue_sup, diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 86f1d9c936..a33b1a3470 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -33,16 +33,14 @@ -behaviour(gen_server2). --export([start_link/3, write/2, read/2, contains/1, remove/1, release/1, - sync/2, client_init/0, client_terminate/1]). +-export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, + sync/3, client_init/1, client_terminate/1]). --export([sync/0, gc_done/3, set_maximum_since_use/1]). %% internal +-export([sync/1, gc_done/4, set_maximum_since_use/2]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1]). --define(SERVER, ?MODULE). - -define(SYNC_INTERVAL, 5). %% milliseconds -define(GEOMETRIC_P, 0.3). %% parameter to geometric distribution rng @@ -62,44 +60,58 @@ sum_valid_data, %% sum of valid data in all files sum_file_size, %% sum of file sizes pending_gc_completion, %% things to do once GC completes - gc_active %% is the GC currently working? + gc_active, %% is the GC currently working? + gc_pid, %% pid of our GC + file_handles_ets, %% tid of the shared file handles table + file_summary_ets, %% tid of the file summary table + dedup_cache_ets, %% tid of dedup cache table + cur_file_cache_ets %% tid of current file cache table }). -record(client_msstate, { file_handle_cache, index_state, index_module, - dir + dir, + file_handles_ets, + file_summary_ets, + dedup_cache_ets, + cur_file_cache_ets }). %%---------------------------------------------------------------------------- -ifdef(use_specs). +-type(server() :: pid() | atom()). -type(msg_id() :: binary()). -type(msg() :: any()). -type(file_path() :: any()). -type(file_num() :: non_neg_integer()). --type(client_msstate() :: #client_msstate { file_handle_cache :: dict(), - index_state :: any(), - index_module :: atom(), - dir :: file_path() }). - --spec(start_link/3 :: - (file_path(), +-type(client_msstate() :: #client_msstate { file_handle_cache :: dict(), + index_state :: any(), + index_module :: atom(), + dir :: file_path(), + file_handles_ets :: tid(), + file_summary_ets :: tid(), + dedup_cache_ets :: tid(), + cur_file_cache_ets :: tid() }). + +-spec(start_link/4 :: + (atom(), file_path(), (fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})), A) -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(write/2 :: (msg_id(), msg()) -> 'ok'). -%% -spec(read/1 :: (msg_id()) -> {'ok', msg()} | 'not_found'). --spec(read/2 :: (msg_id(), client_msstate()) -> +-spec(write/4 :: (server(), msg_id(), msg(), client_msstate()) -> + {'ok', client_msstate()}). +-spec(read/3 :: (server(), msg_id(), client_msstate()) -> {{'ok', msg()} | 'not_found', client_msstate()}). --spec(contains/1 :: (msg_id()) -> boolean()). --spec(remove/1 :: ([msg_id()]) -> 'ok'). --spec(release/1 :: ([msg_id()]) -> 'ok'). --spec(sync/2 :: ([msg_id()], fun (() -> any())) -> 'ok'). --spec(gc_done/3 :: (non_neg_integer(), file_num(), file_num()) -> 'ok'). --spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). --spec(client_init/0 :: () -> client_msstate()). +-spec(contains/2 :: (server(), msg_id()) -> boolean()). +-spec(remove/2 :: (server(), [msg_id()]) -> 'ok'). +-spec(release/2 :: (server(), [msg_id()]) -> 'ok'). +-spec(sync/3 :: (server(), [msg_id()], fun (() -> any())) -> 'ok'). +-spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> 'ok'). +-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). +-spec(client_init/1 :: (server()) -> client_msstate()). -spec(client_terminate/1 :: (client_msstate()) -> 'ok'). -endif. @@ -264,28 +276,32 @@ %% public API %%---------------------------------------------------------------------------- -start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, - [Dir, MsgRefDeltaGen, MsgRefDeltaGenInit], +start_link(Server, Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) -> + gen_server2:start_link({local, Server}, ?MODULE, + [Server, Dir, MsgRefDeltaGen, MsgRefDeltaGenInit], [{timeout, infinity}]). -write(MsgId, Msg) -> - ok = add_to_cache(MsgId, Msg), - gen_server2:cast(?SERVER, {write, MsgId, Msg}). +write(Server, MsgId, Msg, CState = + #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> + ok = add_to_cache(CurFileCacheEts, MsgId, Msg), + {gen_server2:cast(Server, {write, MsgId, Msg}), CState}. -read(MsgId, CState) -> +read(Server, MsgId, CState = + #client_msstate { dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts }) -> %% 1. Check the dedup cache - case fetch_and_increment_cache(MsgId) of + case fetch_and_increment_cache(DedupCacheEts, MsgId) of not_found -> %% 2. Check the cur file cache - case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of + case ets:lookup(CurFileCacheEts, MsgId) of [] -> Defer = fun() -> {gen_server2:pcall( - ?SERVER, 2, {read, MsgId}, infinity), + Server, 2, {read, MsgId}, infinity), CState} end, case index_lookup(MsgId, CState) of not_found -> Defer(); - MsgLocation -> client_read1(MsgLocation, Defer, CState) + MsgLocation -> client_read1(Server, MsgLocation, Defer, + CState) end; [{MsgId, Msg, _CacheRefCount}] -> %% Although we've found it, we don't know the @@ -296,25 +312,29 @@ read(MsgId, CState) -> {{ok, Msg}, CState} end. -contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity). -remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). -release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). -sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}). -sync() -> gen_server2:pcast(?SERVER, 8, sync). %% internal - -gc_done(Reclaimed, Source, Destination) -> - gen_server2:pcast(?SERVER, 8, {gc_done, Reclaimed, Source, Destination}). - -set_maximum_since_use(Age) -> - gen_server2:pcast(?SERVER, 8, {set_maximum_since_use, Age}). - -client_init() -> - {IState, IModule, Dir} = - gen_server2:call(?SERVER, new_client_state, infinity), - #client_msstate { file_handle_cache = dict:new(), - index_state = IState, - index_module = IModule, - dir = Dir }. +contains(Server, MsgId) -> gen_server2:call(Server, {contains, MsgId}, infinity). +remove(Server, MsgIds) -> gen_server2:cast(Server, {remove, MsgIds}). +release(Server, MsgIds) -> gen_server2:cast(Server, {release, MsgIds}). +sync(Server, MsgIds, K) -> gen_server2:cast(Server, {sync, MsgIds, K}). +sync(Server) -> gen_server2:pcast(Server, 8, sync). %% internal + +gc_done(Server, Reclaimed, Source, Destination) -> + gen_server2:pcast(Server, 8, {gc_done, Reclaimed, Source, Destination}). + +set_maximum_since_use(Server, Age) -> + gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}). + +client_init(Server) -> + {IState, IModule, Dir, FileHandlesEts, FileSummaryEts, DedupCacheEts, + CurFileCacheEts} = gen_server2:call(Server, new_client_state, infinity), + #client_msstate { file_handle_cache = dict:new(), + index_state = IState, + index_module = IModule, + dir = Dir, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts }. client_terminate(CState) -> close_all_handles(CState), @@ -324,53 +344,58 @@ client_terminate(CState) -> %% Client-side-only helpers %%---------------------------------------------------------------------------- -add_to_cache(MsgId, Msg) -> - case ets:insert_new(?CUR_FILE_CACHE_ETS_NAME, {MsgId, Msg, 1}) of +add_to_cache(CurFileCacheEts, MsgId, Msg) -> + case ets:insert_new(CurFileCacheEts, {MsgId, Msg, 1}) of true -> ok; false -> try - ets:update_counter(?CUR_FILE_CACHE_ETS_NAME, MsgId, {3, +1}), + ets:update_counter(CurFileCacheEts, MsgId, {3, +1}), ok - catch error:badarg -> add_to_cache(MsgId, Msg) + catch error:badarg -> add_to_cache(CurFileCacheEts, MsgId, Msg) end end. -client_read1(MsgLocation = #msg_location { msg_id = MsgId, file = File }, Defer, - CState) -> - case ets:lookup(?FILE_SUMMARY_ETS_NAME, File) of +client_read1(Server, #msg_location { msg_id = MsgId, file = File } = + MsgLocation, Defer, CState = + #client_msstate { file_summary_ets = FileSummaryEts }) -> + case ets:lookup(FileSummaryEts, File) of [] -> %% File has been GC'd and no longer exists. Go around again. - read(MsgId, CState); + read(Server, MsgId, CState); [#file_summary { locked = Locked, right = Right }] -> - client_read2(Locked, Right, MsgLocation, Defer, CState) + client_read2(Server, Locked, Right, MsgLocation, Defer, CState) end. -client_read2(false, undefined, #msg_location { - msg_id = MsgId, ref_count = RefCount }, Defer, CState) -> - case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of +client_read2(_Server, false, undefined, + #msg_location { msg_id = MsgId, ref_count = RefCount }, Defer, + CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, + dedup_cache_ets = DedupCacheEts }) -> + case ets:lookup(CurFileCacheEts, MsgId) of [] -> Defer(); %% may have rolled over [{MsgId, Msg, _CacheRefCount}] -> - ok = maybe_insert_into_cache(RefCount, MsgId, Msg), + ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg), {{ok, Msg}, CState} end; -client_read2(true, _Right, _MsgLocation, Defer, _CState) -> +client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) -> %% Of course, in the mean time, the GC could have run and our msg %% is actually in a different file, unlocked. However, defering is %% the safest and simplest thing to do. Defer(); -client_read2(false, _Right, #msg_location { - msg_id = MsgId, ref_count = RefCount, file = File }, - Defer, CState) -> +client_read2(Server, false, _Right, + #msg_location { msg_id = MsgId, ref_count = RefCount, file = File }, + Defer, CState = + #client_msstate { file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts }) -> %% It's entirely possible that everything we're doing from here on %% is for the wrong file, or a non-existent file, as a GC may have %% finished. - try ets:update_counter(?FILE_SUMMARY_ETS_NAME, File, - {#file_summary.readers, +1}) + try ets:update_counter(FileSummaryEts, File, {#file_summary.readers, +1}) catch error:badarg -> %% the File has been GC'd and deleted. Go around. - read(MsgId, CState) + read(Server, MsgId, CState) end, - Release = fun() -> ets:update_counter(?FILE_SUMMARY_ETS_NAME, File, + Release = fun() -> ets:update_counter(FileSummaryEts, File, {#file_summary.readers, -1}) end, %% If a GC hasn't already started, it won't start now. Need to @@ -378,7 +403,7 @@ client_read2(false, _Right, #msg_location { %% between lookup and update_counter (thus GC started before our %% +1). [#file_summary { locked = Locked }] = - ets:lookup(?FILE_SUMMARY_ETS_NAME, File), + ets:lookup(FileSummaryEts, File), case Locked of true -> %% If we get a badarg here, then the GC has finished and @@ -390,7 +415,7 @@ client_read2(false, _Right, #msg_location { %% readers, msg_store ets:deletes (and unlocks the dest) try Release(), Defer() - catch error:badarg -> read(MsgId, CState) + catch error:badarg -> read(Server, MsgId, CState) end; false -> %% Ok, we're definitely safe to continue - a GC can't @@ -410,23 +435,25 @@ client_read2(false, _Right, #msg_location { MsgLocation = #msg_location { file = File } -> %% Still the same file. %% This is fine to fail (already exists) - ets:insert_new( - ?FILE_HANDLES_ETS_NAME, {{self(), File}, open}), + ets:insert_new(FileHandlesEts, {{self(), File}, open}), CState1 = close_all_indicated(CState), - {Msg, CState2} = read_from_disk(MsgLocation, CState1), - ok = maybe_insert_into_cache(RefCount, MsgId, Msg), + {Msg, CState2} = + read_from_disk(MsgLocation, CState1, DedupCacheEts), + ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, + Msg), Release(), %% this MUST NOT fail with badarg {{ok, Msg}, CState2}; MsgLocation -> %% different file! Release(), %% this MUST NOT fail with badarg - client_read1(MsgLocation, Defer, CState) + client_read1(Server, MsgLocation, Defer, CState) end end. -close_all_indicated(CState) -> - Objs = ets:match_object(?FILE_HANDLES_ETS_NAME, {{self(), '_'}, close}), +close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } = + CState) -> + Objs = ets:match_object(FileHandlesEts, {{self(), '_'}, close}), lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) -> - true = ets:delete(?FILE_HANDLES_ETS_NAME, Key), + true = ets:delete(FileHandlesEts, Key), close_handle(File, CStateM) end, CState, Objs). @@ -434,12 +461,13 @@ close_all_indicated(CState) -> %% gen_server callbacks %%---------------------------------------------------------------------------- -init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> +init([Server, BaseDir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> process_flag(trap_exit, true), - ok = - file_handle_cache:register_callback(?MODULE, set_maximum_since_use, []), + ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, + [self()]), + Dir = filename:join(BaseDir, atom_to_list(Server)), ok = filelib:ensure_dir(filename:join(Dir, "nothing")), {ok, IndexModule} = application:get_env(msg_store_index_module), @@ -448,28 +476,32 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> IndexState = IndexModule:init(Dir), InitFile = 0, - ?FILE_SUMMARY_ETS_NAME = ets:new(?FILE_SUMMARY_ETS_NAME, - [ordered_set, public, named_table, - {keypos, #file_summary.file}]), - ?CACHE_ETS_NAME = ets:new(?CACHE_ETS_NAME, [set, public, named_table]), - ?FILE_HANDLES_ETS_NAME = ets:new(?FILE_HANDLES_ETS_NAME, - [ordered_set, public, named_table]), - ?CUR_FILE_CACHE_ETS_NAME = ets:new(?CUR_FILE_CACHE_ETS_NAME, - [set, public, named_table]), - State = - #msstate { dir = Dir, - index_module = IndexModule, - index_state = IndexState, - current_file = InitFile, - current_file_handle = undefined, - file_handle_cache = dict:new(), - on_sync = [], - sync_timer_ref = undefined, - sum_valid_data = 0, - sum_file_size = 0, - pending_gc_completion = [], - gc_active = false - }, + FileSummaryEts = ets:new(rabbit_msg_store_file_summary, + [ordered_set, public, + {keypos, #file_summary.file}]), + DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]), + FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles, + [ordered_set, public]), + CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]), + + State = #msstate { dir = Dir, + index_module = IndexModule, + index_state = IndexState, + current_file = InitFile, + current_file_handle = undefined, + file_handle_cache = dict:new(), + on_sync = [], + sync_timer_ref = undefined, + sum_valid_data = 0, + sum_file_size = 0, + pending_gc_completion = [], + gc_active = false, + gc_pid = undefined, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts + }, ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State), FileNames = @@ -490,9 +522,11 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> {ok, Offset} = file_handle_cache:position(FileHdl, Offset), ok = file_handle_cache:truncate(FileHdl), - {ok, _Pid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule), + {ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule, + FileSummaryEts), - {ok, State1 #msstate { current_file_handle = FileHdl }, hibernate, + {ok, State1 #msstate { current_file_handle = FileHdl, + gc_pid = GCPid }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call({read, MsgId}, From, State) -> @@ -504,16 +538,23 @@ handle_call({contains, MsgId}, From, State) -> noreply(State1); handle_call(new_client_state, _From, - State = #msstate { index_state = IndexState, dir = Dir, - index_module = IndexModule }) -> - reply({IndexState, IndexModule, Dir}, State). + State = #msstate { index_state = IndexState, dir = Dir, + index_module = IndexModule, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts }) -> + reply({IndexState, IndexModule, Dir, FileHandlesEts, FileSummaryEts, + DedupCacheEts, CurFileCacheEts}, State). handle_cast({write, MsgId, Msg}, State = #msstate { current_file_handle = CurHdl, current_file = CurFile, sum_valid_data = SumValid, - sum_file_size = SumFileSize }) -> - true = 0 =< ets:update_counter(?CUR_FILE_CACHE_ETS_NAME, MsgId, {3, -1}), + sum_file_size = SumFileSize, + file_summary_ets = FileSummaryEts, + cur_file_cache_ets = CurFileCacheEts }) -> + true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), case index_lookup(MsgId, State) of not_found -> %% New message, lots to do @@ -528,7 +569,7 @@ handle_cast({write, MsgId, Msg}, right = undefined, locked = false, file_size = FileSize }] = - ets:lookup(?FILE_SUMMARY_ETS_NAME, CurFile), + ets:lookup(FileSummaryEts, CurFile), ValidTotalSize1 = ValidTotalSize + TotalSize, ContiguousTop1 = if CurOffset =:= ContiguousTop -> %% can't be any holes in this file @@ -536,7 +577,7 @@ handle_cast({write, MsgId, Msg}, true -> ContiguousTop end, true = ets:update_element( - ?FILE_SUMMARY_ETS_NAME, + FileSummaryEts, CurFile, [{#file_summary.valid_total_size, ValidTotalSize1}, {#file_summary.contiguous_top, ContiguousTop1}, @@ -562,8 +603,10 @@ handle_cast({remove, MsgIds}, State) -> State, MsgIds), noreply(maybe_compact(State1)); -handle_cast({release, MsgIds}, State) -> - lists:foreach(fun (MsgId) -> decrement_cache(MsgId) end, MsgIds), +handle_cast({release, MsgIds}, State = + #msstate { dedup_cache_ets = DedupCacheEts }) -> + lists:foreach( + fun (MsgId) -> decrement_cache(DedupCacheEts, MsgId) end, MsgIds), noreply(State); handle_cast({sync, MsgIds, K}, @@ -582,32 +625,34 @@ handle_cast({sync, MsgIds, K}, end; handle_cast(sync, State) -> - noreply(sync(State)); + noreply(internal_sync(State)); handle_cast({gc_done, Reclaimed, Source, Dest}, State = #msstate { sum_file_size = SumFileSize, - gc_active = {Source, Dest} }) -> + gc_active = {Source, Dest}, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts }) -> %% GC done, so now ensure that any clients that have open fhs to %% those files close them before using them again. This has to be %% done here, and not when starting up the GC, because if done %% when starting up the GC, the client could find the close, and %% close and reopen the fh, whilst the GC is waiting for readers %% to disappear, before it's actually done the GC. - true = mark_handle_to_close(Source), - true = mark_handle_to_close(Dest), + true = mark_handle_to_close(FileHandlesEts, Source), + true = mark_handle_to_close(FileHandlesEts, Dest), %% we always move data left, so Source has gone and was on the %% right, so need to make dest = source.right.left, and also %% dest.right = source.right [#file_summary { left = Dest, right = SourceRight, locked = true, readers = 0 }] = - ets:lookup(?FILE_SUMMARY_ETS_NAME, Source), + ets:lookup(FileSummaryEts, Source), %% this could fail if SourceRight == undefined - ets:update_element(?FILE_SUMMARY_ETS_NAME, SourceRight, + ets:update_element(FileSummaryEts, SourceRight, {#file_summary.left, Dest}), - true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Dest, + true = ets:update_element(FileSummaryEts, Dest, [{#file_summary.locked, false}, {#file_summary.right, SourceRight}]), - true = ets:delete(?FILE_SUMMARY_ETS_NAME, Source), + true = ets:delete(FileSummaryEts, Source), noreply(run_pending( State #msstate { sum_file_size = SumFileSize - Reclaimed, gc_active = false })); @@ -617,28 +662,33 @@ handle_cast({set_maximum_since_use, Age}, State) -> noreply(State). handle_info(timeout, State) -> - noreply(sync(State)); + noreply(internal_sync(State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. -terminate(_Reason, State = #msstate { index_state = IndexState, - index_module = IndexModule, - current_file_handle = FileHdl }) -> +terminate(_Reason, State = #msstate { index_state = IndexState, + index_module = IndexModule, + current_file_handle = FileHdl, + gc_pid = GCPid, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts }) -> %% stop the gc first, otherwise it could be working and we pull %% out the ets tables from under it. - ok = rabbit_msg_store_gc:stop(), + ok = rabbit_msg_store_gc:stop(GCPid), State1 = case FileHdl of undefined -> State; - _ -> State2 = sync(State), + _ -> State2 = internal_sync(State), file_handle_cache:close(FileHdl), State2 end, State3 = close_all_handles(State1), - ets:delete(?FILE_SUMMARY_ETS_NAME), - ets:delete(?CACHE_ETS_NAME), - ets:delete(?FILE_HANDLES_ETS_NAME), - ets:delete(?CUR_FILE_CACHE_ETS_NAME), + ets:delete(FileSummaryEts), + ets:delete(DedupCacheEts), + ets:delete(FileHandlesEts), + ets:delete(CurFileCacheEts), IndexModule:terminate(IndexState), State3 #msstate { index_state = undefined, current_file_handle = undefined }. @@ -686,7 +736,7 @@ sort_file_names(FileNames) -> lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end, FileNames). -sync(State = #msstate { current_file_handle = CurHdl, +internal_sync(State = #msstate { current_file_handle = CurHdl, on_sync = Syncs }) -> State1 = stop_sync_timer(State), case Syncs of @@ -697,12 +747,13 @@ sync(State = #msstate { current_file_handle = CurHdl, State1 #msstate { on_sync = [] } end. -read_message(MsgId, From, State) -> +read_message(MsgId, From, State = + #msstate { dedup_cache_ets = DedupCacheEts }) -> case index_lookup(MsgId, State) of not_found -> gen_server2:reply(From, not_found), State; MsgLocation -> - case fetch_and_increment_cache(MsgId) of + case fetch_and_increment_cache(DedupCacheEts, MsgId) of not_found -> read_message1(From, MsgLocation, State); Msg -> @@ -714,33 +765,36 @@ read_message(MsgId, From, State) -> read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount, file = File, offset = Offset } = MsgLoc, State = #msstate { current_file = CurFile, - current_file_handle = CurHdl }) -> + current_file_handle = CurHdl, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts }) -> case File =:= CurFile of true -> {Msg, State1} = %% can return [] if msg in file existed on startup - case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of + case ets:lookup(CurFileCacheEts, MsgId) of [] -> ok = case {ok, Offset} >= file_handle_cache:current_raw_offset(CurHdl) of true -> file_handle_cache:flush(CurHdl); false -> ok end, - read_from_disk(MsgLoc, State); + read_from_disk(MsgLoc, State, DedupCacheEts); [{MsgId, Msg1, _CacheRefCount}] -> {Msg1, State} end, - ok = maybe_insert_into_cache(RefCount, MsgId, Msg), + ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg), gen_server2:reply(From, {ok, Msg}), State1; false -> [#file_summary { locked = Locked }] = - ets:lookup(?FILE_SUMMARY_ETS_NAME, File), + ets:lookup(FileSummaryEts, File), case Locked of true -> add_to_pending_gc_completion({read, MsgId, From}, State); false -> - {Msg, State1} = read_from_disk(MsgLoc, State), + {Msg, State1} = read_from_disk(MsgLoc, State, DedupCacheEts), gen_server2:reply(From, {ok, Msg}), State1 end @@ -748,7 +802,8 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount, read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount, file = File, offset = Offset, - total_size = TotalSize }, State) -> + total_size = TotalSize }, State, + DedupCacheEts) -> {Hdl, State1} = get_read_handle(File, State), {ok, Offset} = file_handle_cache:position(Hdl, Offset), {ok, {MsgId, Msg}} = @@ -764,12 +819,13 @@ read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount, {proc_dict, get()} ]}}) end, - ok = maybe_insert_into_cache(RefCount, MsgId, Msg), + ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg), {Msg, State1}. -maybe_insert_into_cache(RefCount, MsgId, Msg) when RefCount > 1 -> - insert_into_cache(MsgId, Msg); -maybe_insert_into_cache(_RefCount, _MsgId, _Msg) -> +maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg) + when RefCount > 1 -> + insert_into_cache(DedupCacheEts, MsgId, Msg); +maybe_insert_into_cache(_DedupCacheEts, _RefCount, _MsgId, _Msg) -> ok. contains_message(MsgId, From, State = #msstate { gc_active = GCActive }) -> @@ -788,7 +844,9 @@ contains_message(MsgId, From, State = #msstate { gc_active = GCActive }) -> end end. -remove_message(MsgId, State = #msstate { sum_valid_data = SumValid }) -> +remove_message(MsgId, State = #msstate { sum_valid_data = SumValid, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts }) -> #msg_location { ref_count = RefCount, file = File, offset = Offset, total_size = TotalSize } = index_lookup(MsgId, State), @@ -797,11 +855,11 @@ remove_message(MsgId, State = #msstate { sum_valid_data = SumValid }) -> %% don't remove from CUR_FILE_CACHE_ETS_NAME here because %% there may be further writes in the mailbox for the same %% msg. - ok = remove_cache_entry(MsgId), + ok = remove_cache_entry(DedupCacheEts, MsgId), [#file_summary { valid_total_size = ValidTotalSize, contiguous_top = ContiguousTop, locked = Locked }] = - ets:lookup(?FILE_SUMMARY_ETS_NAME, File), + ets:lookup(FileSummaryEts, File), case Locked of true -> add_to_pending_gc_completion({remove, MsgId}, State); @@ -810,15 +868,14 @@ remove_message(MsgId, State = #msstate { sum_valid_data = SumValid }) -> ContiguousTop1 = lists:min([ContiguousTop, Offset]), ValidTotalSize1 = ValidTotalSize - TotalSize, true = ets:update_element( - ?FILE_SUMMARY_ETS_NAME, - File, + FileSummaryEts, File, [{#file_summary.valid_total_size, ValidTotalSize1}, {#file_summary.contiguous_top, ContiguousTop1}]), State1 = delete_file_if_empty(File, State), State1 #msstate { sum_valid_data = SumValid - TotalSize } end; _ when 1 < RefCount -> - ok = decrement_cache(MsgId), + ok = decrement_cache(DedupCacheEts, MsgId), %% only update field, otherwise bad interaction with concurrent GC ok = index_update_fields(MsgId, {#msg_location.ref_count, RefCount - 1}, @@ -857,11 +914,12 @@ close_handle(Key, FHC) -> error -> FHC end. -close_all_handles(CState = #client_msstate { file_handle_cache = FHC }) -> +close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts, + file_handle_cache = FHC }) -> Self = self(), ok = dict:fold(fun (File, Hdl, ok) -> true = - ets:delete(?FILE_HANDLES_ETS_NAME, {Self, File}), + ets:delete(FileHandlesEts, {Self, File}), file_handle_cache:close(Hdl) end, ok, FHC), CState #client_msstate { file_handle_cache = dict:new() }; @@ -897,27 +955,27 @@ get_read_handle(FileNum, FHC, Dir) -> %% message cache helper functions %%---------------------------------------------------------------------------- -remove_cache_entry(MsgId) -> - true = ets:delete(?CACHE_ETS_NAME, MsgId), +remove_cache_entry(DedupCacheEts, MsgId) -> + true = ets:delete(DedupCacheEts, MsgId), ok. -fetch_and_increment_cache(MsgId) -> - case ets:lookup(?CACHE_ETS_NAME, MsgId) of +fetch_and_increment_cache(DedupCacheEts, MsgId) -> + case ets:lookup(DedupCacheEts, MsgId) of [] -> not_found; [{_MsgId, Msg, _RefCount}] -> try - ets:update_counter(?CACHE_ETS_NAME, MsgId, {3, 1}) + ets:update_counter(DedupCacheEts, MsgId, {3, 1}) catch error:badarg -> %% someone has deleted us in the meantime, insert us - ok = insert_into_cache(MsgId, Msg) + ok = insert_into_cache(DedupCacheEts, MsgId, Msg) end, Msg end. -decrement_cache(MsgId) -> - true = try case ets:update_counter(?CACHE_ETS_NAME, MsgId, {3, -1}) of - N when N =< 0 -> true = ets:delete(?CACHE_ETS_NAME, MsgId); +decrement_cache(DedupCacheEts, MsgId) -> + true = try case ets:update_counter(DedupCacheEts, MsgId, {3, -1}) of + N when N =< 0 -> true = ets:delete(DedupCacheEts, MsgId); _N -> true end catch error:badarg -> @@ -928,14 +986,14 @@ decrement_cache(MsgId) -> end, ok. -insert_into_cache(MsgId, Msg) -> - case ets:insert_new(?CACHE_ETS_NAME, {MsgId, Msg, 1}) of +insert_into_cache(DedupCacheEts, MsgId, Msg) -> + case ets:insert_new(DedupCacheEts, {MsgId, Msg, 1}) of true -> ok; false -> try - ets:update_counter(?CACHE_ETS_NAME, MsgId, {3, 1}), + ets:update_counter(DedupCacheEts, MsgId, {3, 1}), ok catch error:badarg -> - insert_into_cache(MsgId, Msg) + insert_into_cache(DedupCacheEts, MsgId, Msg) end end. @@ -1125,16 +1183,17 @@ build_index(Files, State) -> {Offset, State1} = build_index(undefined, Files, State), {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)}. -build_index(Left, [], State) -> +build_index(Left, [], State = #msstate { file_summary_ets = FileSummaryEts }) -> ok = index_delete_by_file(undefined, State), - Offset = case ets:lookup(?FILE_SUMMARY_ETS_NAME, Left) of + Offset = case ets:lookup(FileSummaryEts, Left) of [] -> 0; [#file_summary { file_size = FileSize }] -> FileSize end, {Offset, State #msstate { current_file = Left }}; build_index(Left, [File|Files], State = #msstate { dir = Dir, sum_valid_data = SumValid, - sum_file_size = SumFileSize }) -> + sum_file_size = SumFileSize, + file_summary_ets = FileSummaryEts }) -> {ok, Messages, FileSize} = rabbit_msg_store_misc:scan_file_for_valid_messages( Dir, rabbit_msg_store_misc:filenum_to_name(File)), @@ -1168,7 +1227,7 @@ build_index(Left, [File|Files], [F|_] -> {F, FileSize} end, true = - ets:insert_new(?FILE_SUMMARY_ETS_NAME, #file_summary { + ets:insert_new(FileSummaryEts, #file_summary { file = File, valid_total_size = ValidTotalSize, contiguous_top = ContiguousTop, locked = false, left = Left, right = Right, file_size = FileSize1, @@ -1184,61 +1243,65 @@ build_index(Left, [File|Files], maybe_roll_to_new_file(Offset, State = #msstate { dir = Dir, current_file_handle = CurHdl, - current_file = CurFile }) + current_file = CurFile, + file_summary_ets = FileSummaryEts, + cur_file_cache_ets = CurFileCacheEts }) when Offset >= ?FILE_SIZE_LIMIT -> - State1 = sync(State), + State1 = internal_sync(State), ok = file_handle_cache:close(CurHdl), NextFile = CurFile + 1, {ok, NextHdl} = rabbit_msg_store_misc:open_file( Dir, rabbit_msg_store_misc:filenum_to_name(NextFile), ?WRITE_MODE), true = ets:insert_new( - ?FILE_SUMMARY_ETS_NAME, #file_summary { + FileSummaryEts, #file_summary { file = NextFile, valid_total_size = 0, contiguous_top = 0, left = CurFile, right = undefined, file_size = 0, locked = false, readers = 0 }), - true = ets:update_element(?FILE_SUMMARY_ETS_NAME, CurFile, + true = ets:update_element(FileSummaryEts, CurFile, {#file_summary.right, NextFile}), - true = ets:match_delete(?CUR_FILE_CACHE_ETS_NAME, {'_', '_', 0}), + true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}), State1 #msstate { current_file_handle = NextHdl, current_file = NextFile }; maybe_roll_to_new_file(_, State) -> State. -maybe_compact(State = #msstate { sum_valid_data = SumValid, - sum_file_size = SumFileSize, - gc_active = false }) +maybe_compact(State = #msstate { sum_valid_data = SumValid, + sum_file_size = SumFileSize, + gc_active = false, + gc_pid = GCPid, + file_summary_ets = FileSummaryEts }) when (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION -> - First = ets:first(?FILE_SUMMARY_ETS_NAME), + First = ets:first(FileSummaryEts), N = random_distributions:geometric(?GEOMETRIC_P), - case find_files_to_gc(N, First) of + case find_files_to_gc(FileSummaryEts, N, First) of undefined -> State; {Source, Dest} -> State1 = close_handle(Source, close_handle(Dest, State)), - true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Source, + true = ets:update_element(FileSummaryEts, Source, {#file_summary.locked, true}), - true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Dest, + true = ets:update_element(FileSummaryEts, Dest, {#file_summary.locked, true}), - ok = rabbit_msg_store_gc:gc(Source, Dest), + ok = rabbit_msg_store_gc:gc(GCPid, Source, Dest), State1 #msstate { gc_active = {Source, Dest} } end; maybe_compact(State) -> State. -mark_handle_to_close(File) -> - [ ets:update_element(?FILE_HANDLES_ETS_NAME, Key, {2, close}) - || {Key, open} <- ets:match_object(?FILE_HANDLES_ETS_NAME, +mark_handle_to_close(FileHandlesEts, File) -> + [ ets:update_element(FileHandlesEts, Key, {2, close}) + || {Key, open} <- ets:match_object(FileHandlesEts, {{'_', File}, open}) ], true. -find_files_to_gc(_N, '$end_of_table') -> +find_files_to_gc(_FileSummaryEts, _N, '$end_of_table') -> undefined; -find_files_to_gc(N, First) -> +find_files_to_gc(FileSummaryEts, N, First) -> [FirstObj = #file_summary { right = Right }] = - ets:lookup(?FILE_SUMMARY_ETS_NAME, First), - Pairs = find_files_to_gc(N, FirstObj, - ets:lookup(?FILE_SUMMARY_ETS_NAME, Right), []), + ets:lookup(FileSummaryEts, First), + Pairs = find_files_to_gc(FileSummaryEts, N, FirstObj, + ets:lookup(FileSummaryEts, Right), []), case Pairs of [] -> undefined; [Pair] -> Pair; @@ -1246,9 +1309,9 @@ find_files_to_gc(N, First) -> lists:nth(M, Pairs) end. -find_files_to_gc(_N, #file_summary {}, [], Pairs) -> +find_files_to_gc(_FileSummaryEts, _N, #file_summary {}, [], Pairs) -> lists:reverse(Pairs); -find_files_to_gc(N, +find_files_to_gc(FileSummaryEts, N, #file_summary { right = Source, file = Dest, valid_total_size = DestValid }, [SourceObj = #file_summary { left = Dest, right = SourceRight, @@ -1259,22 +1322,24 @@ find_files_to_gc(N, Pair = {Source, Dest}, case N == 1 of true -> [Pair]; - false -> find_files_to_gc((N - 1), SourceObj, - ets:lookup(?FILE_SUMMARY_ETS_NAME, SourceRight), + false -> find_files_to_gc(FileSummaryEts, (N - 1), SourceObj, + ets:lookup(FileSummaryEts, SourceRight), [Pair | Pairs]) end; -find_files_to_gc(N, _Left, +find_files_to_gc(FileSummaryEts, N, _Left, [Right = #file_summary { right = RightRight }], Pairs) -> find_files_to_gc( - N, Right, ets:lookup(?FILE_SUMMARY_ETS_NAME, RightRight), Pairs). + FileSummaryEts, N, Right, ets:lookup(FileSummaryEts, RightRight), Pairs). delete_file_if_empty(File, State = #msstate { current_file = File }) -> State; delete_file_if_empty(File, State = - #msstate { dir = Dir, sum_file_size = SumFileSize }) -> + #msstate { dir = Dir, sum_file_size = SumFileSize, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts }) -> [#file_summary { valid_total_size = ValidData, file_size = FileSize, left = Left, right = Right, locked = false }] - = ets:lookup(?FILE_SUMMARY_ETS_NAME, File), + = ets:lookup(FileSummaryEts, File), case ValidData of %% we should NEVER find the current file in here hence right %% should always be a file, not undefined @@ -1282,16 +1347,16 @@ delete_file_if_empty(File, State = {undefined, _} when not is_atom(Right) -> %% the eldest file is empty. true = ets:update_element( - ?FILE_SUMMARY_ETS_NAME, Right, + FileSummaryEts, Right, {#file_summary.left, undefined}); {_, _} when not is_atom(Right) -> - true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Right, + true = ets:update_element(FileSummaryEts, Right, {#file_summary.left, Left}), - true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Left, + true = ets:update_element(FileSummaryEts, Left, {#file_summary.right, Right}) end, - true = mark_handle_to_close(File), - true = ets:delete(?FILE_SUMMARY_ETS_NAME, File), + true = mark_handle_to_close(FileHandlesEts, File), + true = ets:delete(FileSummaryEts, File), State1 = close_handle(File, State), ok = file:delete(rabbit_msg_store_misc:form_filename( Dir, diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index a64733dfc2..9cf11af29f 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -33,7 +33,7 @@ -behaviour(gen_server2). --export([start_link/3, gc/2, stop/0]). +-export([start_link/4, gc/3, stop/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -41,40 +41,42 @@ -record(gcstate, {dir, index_state, - index_module + index_module, + parent, + file_summary_ets }). -include("rabbit_msg_store.hrl"). --define(SERVER, ?MODULE). - %%---------------------------------------------------------------------------- -start_link(Dir, IndexState, IndexModule) -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, - [Dir, IndexState, IndexModule], - [{timeout, infinity}]). +start_link(Dir, IndexState, IndexModule, FileSummaryEts) -> + gen_server2:start_link( + ?MODULE, [self(), Dir, IndexState, IndexModule, FileSummaryEts], + [{timeout, infinity}]). -gc(Source, Destination) -> - gen_server2:cast(?SERVER, {gc, Source, Destination}). +gc(Server, Source, Destination) -> + gen_server2:cast(Server, {gc, Source, Destination}). -stop() -> - gen_server2:call(?SERVER, stop). +stop(Server) -> + gen_server2:call(Server, stop). %%---------------------------------------------------------------------------- -init([Dir, IndexState, IndexModule]) -> +init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> {ok, #gcstate { dir = Dir, index_state = IndexState, - index_module = IndexModule }, + index_module = IndexModule, parent = Parent, + file_summary_ets = FileSummaryEts}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call(stop, _From, State) -> {stop, normal, ok, State}. -handle_cast({gc, Source, Destination}, State) -> - Reclaimed = adjust_meta_and_combine(Source, Destination, State), - ok = rabbit_msg_store:gc_done(Reclaimed, Source, Destination), +handle_cast({gc, Source, Destination}, State = #gcstate { parent = Parent }) -> + Reclaimed = adjust_meta_and_combine(Source, Destination, + State), + ok = rabbit_msg_store:gc_done(Parent, Reclaimed, Source, Destination), {noreply, State, hibernate}. handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) -> @@ -92,18 +94,19 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -adjust_meta_and_combine(SourceFile, DestFile, State) -> +adjust_meta_and_combine(SourceFile, DestFile, State = + #gcstate { file_summary_ets = FileSummaryEts }) -> [SourceObj = #file_summary { readers = SourceReaders, valid_total_size = SourceValidData, left = DestFile, file_size = SourceFileSize, locked = true }] = - ets:lookup(?FILE_SUMMARY_ETS_NAME, SourceFile), + ets:lookup(FileSummaryEts, SourceFile), [DestObj = #file_summary { readers = DestReaders, valid_total_size = DestValidData, right = SourceFile, file_size = DestFileSize, locked = true }] = - ets:lookup(?FILE_SUMMARY_ETS_NAME, DestFile), + ets:lookup(FileSummaryEts, DestFile), case SourceReaders =:= 0 andalso DestReaders =:= 0 of true -> @@ -112,7 +115,7 @@ adjust_meta_and_combine(SourceFile, DestFile, State) -> %% don't update dest.right, because it could be changing %% at the same time true = ets:update_element( - ?FILE_SUMMARY_ETS_NAME, DestFile, + FileSummaryEts, DestFile, [{#file_summary.valid_total_size, TotalValidData}, {#file_summary.contiguous_top, TotalValidData}, {#file_summary.file_size, TotalValidData}]), diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index a16efb20c0..c0c1b40bd5 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -34,7 +34,8 @@ -export([init/1, terminate/1, terminate_and_erase/1, write_published/4, write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1, read_segment_entries/2, next_segment_boundary/1, segment_size/0, - find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]). + find_lowest_seq_id_seg_and_next_seq_id/1, + start_persistent_msg_store/1]). -define(CLEAN_FILENAME, "clean.dot"). @@ -207,7 +208,7 @@ -spec(segment_size/0 :: () -> non_neg_integer()). -spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). --spec(start_msg_store/1 :: ([amqqueue()]) -> 'ok'). +-spec(start_persistent_msg_store/1 :: ([amqqueue()]) -> 'ok'). -endif. @@ -250,7 +251,8 @@ init(Name) -> {Segment3, DCountAcc2}) -> {Segment4, DCountDelta} = maybe_add_to_journal( - rabbit_msg_store:contains(MsgId), + rabbit_msg_store:contains( + ?PERSISTENT_MSG_STORE, MsgId), CleanShutdown, Del, RelSeq, Segment3), {Segment4, DCountAcc2 + DCountDelta} end, {Segment1 #segment { pubs = PubCount, @@ -379,7 +381,7 @@ find_lowest_seq_id_seg_and_next_seq_id(State) -> end, {LowSeqIdSeg, NextSeqId, State}. -start_msg_store(DurableQueues) -> +start_persistent_msg_store(DurableQueues) -> DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue #amqqueue.name), Queue #amqqueue.name} || Queue <- DurableQueues ]), @@ -404,10 +406,9 @@ start_msg_store(DurableQueues) -> {DurableAcc, [QueueDir | TransientAcc]} end end, {[], []}, Directories), - MsgStoreDir = filename:join(rabbit_mnesia:dir(), "msg_store"), - ok = rabbit_sup:start_child(rabbit_msg_store, [MsgStoreDir, - fun queue_index_walker/1, - DurableQueueNames]), + ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store, + [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), + fun queue_index_walker/1, DurableQueueNames]), lists:foreach(fun (DirName) -> Dir = filename:join(queues_dir(), DirName), ok = delete_queue_directory(Dir) diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index 25715e6e29..2c5e51125e 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor). --export([start_link/0, start_child/1, start_child/2, +-export([start_link/0, start_child/1, start_child/2, start_child/3, start_restartable_child/1, start_restartable_child/2]). -export([init/1]). @@ -49,8 +49,11 @@ start_child(Mod) -> start_child(Mod, []). start_child(Mod, Args) -> + start_child(Mod, Mod, Args). + +start_child(ChildId, Mod, Args) -> {ok, _} = supervisor:start_child(?SERVER, - {Mod, {Mod, start_link, Args}, + {ChildId, {Mod, start_link, Args}, transient, ?MAX_WAIT, worker, [Mod]}), ok. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 96c3f1bcf2..3ccb83b6f4 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -991,20 +991,28 @@ bad_handle_hook(_, _, _) -> extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) -> handle_hook(Hookname, Handler, {Args, Extra1, Extra2}). -msg_store_dir() -> - filename:join(rabbit_mnesia:dir(), "msg_store"). - start_msg_store_empty() -> start_msg_store(fun (ok) -> finished end, ok). start_msg_store(MsgRefDeltaGen, MsgRefDeltaGenInit) -> - rabbit_sup:start_child(rabbit_msg_store, [msg_store_dir(), MsgRefDeltaGen, - MsgRefDeltaGenInit]). + ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store, + [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), + MsgRefDeltaGen, MsgRefDeltaGenInit]), + start_transient_msg_store(). + +start_transient_msg_store() -> + ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store, + [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), + fun (ok) -> finished end, ok]). stop_msg_store() -> - case supervisor:terminate_child(rabbit_sup, rabbit_msg_store) of - ok -> supervisor:delete_child(rabbit_sup, rabbit_msg_store); - E -> E + case supervisor:terminate_child(rabbit_sup, ?PERSISTENT_MSG_STORE) of + ok -> supervisor:delete_child(rabbit_sup, ?PERSISTENT_MSG_STORE), + case supervisor:terminate_child(rabbit_sup, ?TRANSIENT_MSG_STORE) of + ok -> supervisor:delete_child(rabbit_sup, ?TRANSIENT_MSG_STORE); + E -> {transient, E} + end; + E -> {persistent, E} end. msg_id_bin(X) -> @@ -1012,13 +1020,14 @@ msg_id_bin(X) -> msg_store_contains(Atom, MsgIds) -> Atom = lists:foldl( - fun (MsgId, Atom1) when Atom1 =:= Atom -> - rabbit_msg_store:contains(MsgId) end, Atom, MsgIds). + fun (MsgId, Atom1) when Atom1 =:= Atom -> + rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, MsgId) end, + Atom, MsgIds). msg_store_sync(MsgIds) -> Ref = make_ref(), Self = self(), - ok = rabbit_msg_store:sync(MsgIds, + ok = rabbit_msg_store:sync(?PERSISTENT_MSG_STORE, MsgIds, fun () -> Self ! {sync, Ref} end), receive {sync, Ref} -> ok @@ -1031,15 +1040,17 @@ msg_store_sync(MsgIds) -> msg_store_read(MsgIds, MSCState) -> lists:foldl( fun (MsgId, MSCStateM) -> - {{ok, MsgId}, MSCStateN} = rabbit_msg_store:read(MsgId, MSCStateM), + {{ok, MsgId}, MSCStateN} = rabbit_msg_store:read( + ?PERSISTENT_MSG_STORE, MsgId, MSCStateM), MSCStateN end, MSCState, MsgIds). -msg_store_write(MsgIds) -> - ok = lists:foldl( - fun (MsgId, ok) -> rabbit_msg_store:write(MsgId, MsgId) end, - ok, MsgIds). +msg_store_write(MsgIds, MSCState) -> + lists:foldl( + fun (MsgId, {ok, MSCStateN}) -> + rabbit_msg_store:write(?PERSISTENT_MSG_STORE, MsgId, MsgId, MSCStateN) end, + {ok, MSCState}, MsgIds). test_msg_store() -> stop_msg_store(), @@ -1049,25 +1060,27 @@ test_msg_store() -> {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds), %% check we don't contain any of the msgs we're about to publish false = msg_store_contains(false, MsgIds), + MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE), %% publish the first half - ok = msg_store_write(MsgIds1stHalf), + {ok, MSCState1} = msg_store_write(MsgIds1stHalf, MSCState), %% sync on the first half ok = msg_store_sync(MsgIds1stHalf), %% publish the second half - ok = msg_store_write(MsgIds2ndHalf), + {ok, MSCState2} = msg_store_write(MsgIds2ndHalf, MSCState1), %% sync on the first half again - the msg_store will be dirty, but %% we won't need the fsync ok = msg_store_sync(MsgIds1stHalf), %% check they're all in there true = msg_store_contains(true, MsgIds), %% publish the latter half twice so we hit the caching and ref count code - ok = msg_store_write(MsgIds2ndHalf), + {ok, MSCState3} = msg_store_write(MsgIds2ndHalf, MSCState2), %% check they're still all in there true = msg_store_contains(true, MsgIds), %% sync on the 2nd half, but do lots of individual syncs to try %% and cause coalescing to happen ok = lists:foldl( fun (MsgId, ok) -> rabbit_msg_store:sync( + ?PERSISTENT_MSG_STORE, [MsgId], fun () -> Self ! {sync, MsgId} end) end, ok, MsgIds2ndHalf), lists:foldl( @@ -1085,23 +1098,22 @@ test_msg_store() -> %% should hit a different code path ok = msg_store_sync(MsgIds1stHalf), %% read them all - MSCState = rabbit_msg_store:client_init(), - MSCState1 = msg_store_read(MsgIds, MSCState), + MSCState4 = msg_store_read(MsgIds, MSCState3), %% read them all again - this will hit the cache, not disk - MSCState2 = msg_store_read(MsgIds, MSCState1), + MSCState5 = msg_store_read(MsgIds, MSCState4), %% remove them all - ok = rabbit_msg_store:remove(MsgIds), + ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, MsgIds), %% check first half doesn't exist false = msg_store_contains(false, MsgIds1stHalf), %% check second half does exist true = msg_store_contains(true, MsgIds2ndHalf), %% read the second half again - MSCState3 = msg_store_read(MsgIds2ndHalf, MSCState2), + MSCState6 = msg_store_read(MsgIds2ndHalf, MSCState5), %% release the second half, just for fun (aka code coverage) - ok = rabbit_msg_store:release(MsgIds2ndHalf), + ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, MsgIds2ndHalf), %% read the second half again, just for fun (aka code coverage) - MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3), - ok = rabbit_msg_store:client_terminate(MSCState4), + MSCState7 = msg_store_read(MsgIds2ndHalf, MSCState6), + ok = rabbit_msg_store:client_terminate(MSCState7), %% stop and restart, preserving every other msg in 2nd half ok = stop_msg_store(), ok = start_msg_store(fun ([]) -> finished; @@ -1114,7 +1126,7 @@ test_msg_store() -> %% check we have the right msgs left lists:foldl( fun (MsgId, Bool) -> - not(Bool = rabbit_msg_store:contains(MsgId)) + not(Bool = rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, MsgId)) end, false, MsgIds2ndHalf), %% restart empty ok = stop_msg_store(), @@ -1122,11 +1134,12 @@ test_msg_store() -> %% check we don't contain any of the msgs false = msg_store_contains(false, MsgIds), %% publish the first half again - ok = msg_store_write(MsgIds1stHalf), + MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE), + {ok, MSCState9} = msg_store_write(MsgIds1stHalf, MSCState8), %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( - msg_store_read(MsgIds1stHalf, rabbit_msg_store:client_init())), - ok = rabbit_msg_store:remove(MsgIds1stHalf), + msg_store_read(MsgIds1stHalf, MSCState9)), + ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, MsgIds1stHalf), %% restart empty ok = stop_msg_store(), ok = start_msg_store_empty(), %% now safe to reuse msg_ids @@ -1134,34 +1147,37 @@ test_msg_store() -> BigCount = 100000, MsgIdsBig = [msg_id_bin(X) || X <- lists:seq(1, BigCount)], Payload = << 0:65536 >>, - ok = lists:foldl( - fun (MsgId, ok) -> - rabbit_msg_store:write(MsgId, Payload) - end, ok, MsgIdsBig), + ok = rabbit_msg_store:client_terminate( + lists:foldl( + fun (MsgId, MSCStateN) -> + {ok, MSCStateM} = + rabbit_msg_store:write(?PERSISTENT_MSG_STORE, MsgId, Payload, MSCStateN), + MSCStateM + end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE), MsgIdsBig)), %% now read them to ensure we hit the fast client-side reading ok = rabbit_msg_store:client_terminate( lists:foldl( fun (MsgId, MSCStateM) -> {{ok, Payload}, MSCStateN} = - rabbit_msg_store:read(MsgId, MSCStateM), + rabbit_msg_store:read(?PERSISTENT_MSG_STORE, MsgId, MSCStateM), MSCStateN - end, rabbit_msg_store:client_init(), MsgIdsBig)), + end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE), MsgIdsBig)), %% .., then 3s by 1... ok = lists:foldl( fun (MsgId, ok) -> - rabbit_msg_store:remove([msg_id_bin(MsgId)]) + rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [msg_id_bin(MsgId)]) end, ok, lists:seq(BigCount, 1, -3)), %% .., then remove 3s by 2, from the young end first. This hits %% GC (under 50% good data left, but no empty files. Must GC). ok = lists:foldl( fun (MsgId, ok) -> - rabbit_msg_store:remove([msg_id_bin(MsgId)]) + rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [msg_id_bin(MsgId)]) end, ok, lists:seq(BigCount-1, 1, -3)), %% .., then remove 3s by 3, from the young end first. This hits %% GC... ok = lists:foldl( fun (MsgId, ok) -> - rabbit_msg_store:remove([msg_id_bin(MsgId)]) + rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [msg_id_bin(MsgId)]) end, ok, lists:seq(BigCount-2, 1, -3)), %% ensure empty false = msg_store_contains(false, MsgIdsBig), @@ -1184,20 +1200,25 @@ test_amqqueue(Durable) -> pid = none}. empty_test_queue() -> - ok = rabbit_queue_index:start_msg_store([]), + ok = start_transient_msg_store(), + ok = rabbit_queue_index:start_persistent_msg_store([]), {0, Qi1} = rabbit_queue_index:init(test_queue()), _Qi2 = rabbit_queue_index:terminate_and_erase(Qi1), ok. queue_index_publish(SeqIds, Persistent, Qi) -> - lists:foldl( - fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) -> - MsgId = rabbit_guid:guid(), - QiM = rabbit_queue_index:write_published(MsgId, SeqId, Persistent, - QiN), - ok = rabbit_msg_store:write(MsgId, MsgId), - {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]} - end, {Qi, []}, SeqIds). + {A, B, MSCStateEnd} = + lists:foldl( + fun (SeqId, {QiN, SeqIdsMsgIdsAcc, MSCStateN}) -> + MsgId = rabbit_guid:guid(), + QiM = rabbit_queue_index:write_published(MsgId, SeqId, Persistent, + QiN), + {ok, MSCStateM} = rabbit_msg_store:write(?PERSISTENT_MSG_STORE, MsgId, + MsgId, MSCStateN), + {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc], MSCStateM} + end, {Qi, [], rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE)}, SeqIds), + ok = rabbit_msg_store:client_terminate(MSCStateEnd), + {A, B}. queue_index_deliver(SeqIds, Qi) -> lists:foldl( @@ -1236,7 +1257,8 @@ test_queue_index() -> %% call terminate twice to prove it's idempotent _Qi5 = rabbit_queue_index:terminate(rabbit_queue_index:terminate(Qi4)), ok = stop_msg_store(), - ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]), + ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]), + ok = start_transient_msg_store(), %% should get length back as 0, as all the msgs were transient {0, Qi6} = rabbit_queue_index:init(test_queue()), {0, SegSize, Qi7} = @@ -1249,7 +1271,8 @@ test_queue_index() -> lists:reverse(SeqIdsMsgIdsB)), _Qi11 = rabbit_queue_index:terminate(Qi10), ok = stop_msg_store(), - ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]), + ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]), + ok = start_transient_msg_store(), %% should get length back as 10000 LenB = length(SeqIdsB), {LenB, Qi12} = rabbit_queue_index:init(test_queue()), @@ -1266,7 +1289,8 @@ test_queue_index() -> rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17), _Qi19 = rabbit_queue_index:terminate(Qi18), ok = stop_msg_store(), - ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]), + ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]), + ok = start_transient_msg_store(), %% should get length back as 0 because all persistent msgs have been acked {0, Qi20} = rabbit_queue_index:init(test_queue()), _Qi21 = rabbit_queue_index:terminate_and_erase(Qi20), @@ -1307,7 +1331,8 @@ test_queue_index() -> Qi40 = queue_index_flush_journal(Qi39), _Qi41 = rabbit_queue_index:terminate_and_erase(Qi40), ok = stop_msg_store(), - ok = rabbit_queue_index:start_msg_store([]), + ok = rabbit_queue_index:start_persistent_msg_store([]), + ok = start_transient_msg_store(), ok = stop_msg_store(), passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0043bb5ff7..b9714f535b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -154,7 +154,7 @@ rate_timestamp, len, on_sync, - msg_store_read_state + msg_store_clients }). -include("rabbit.hrl"). @@ -186,7 +186,7 @@ -type(bpqueue() :: any()). -type(msg_id() :: binary()). -type(seq_id() :: non_neg_integer()). --type(ack() :: {'ack_index_and_store', msg_id(), seq_id()} +-type(ack() :: {'ack_index_and_store', msg_id(), seq_id(), boolean()} | 'ack_not_on_disk'). -type(vqstate() :: #vqstate { q1 :: queue(), @@ -210,7 +210,7 @@ rate_timestamp :: {integer(), integer(), integer()}, len :: non_neg_integer(), on_sync :: {[ack()], [msg_id()], [{pid(), any()}]}, - msg_store_read_state :: any() + msg_store_clients :: {any(), any()} }). -spec(init/1 :: (queue_name()) -> vqstate()). @@ -285,13 +285,15 @@ init(QueueName) -> rate_timestamp = Now, len = DeltaCount, on_sync = {[], [], []}, - msg_store_read_state = rabbit_msg_store:client_init() + msg_store_clients = {rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE), + rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE)} }, maybe_deltas_to_betas(State). terminate(State = #vqstate { index_state = IndexState, - msg_store_read_state = MSCState }) -> - rabbit_msg_store:client_terminate(MSCState), + msg_store_clients = {MSCStateP, MSCStateT} }) -> + rabbit_msg_store:client_terminate(MSCStateP), + rabbit_msg_store:client_terminate(MSCStateT), State #vqstate { index_state = rabbit_queue_index:terminate(IndexState) }. publish(Msg, State) -> @@ -303,22 +305,24 @@ publish_delivered(Msg = #basic_message { guid = MsgId, State = #vqstate { len = 0, index_state = IndexState, next_seq_id = SeqId, out_counter = OutCount, - in_counter = InCount}) -> + in_counter = InCount, + msg_store_clients = MSCState }) -> State1 = State #vqstate { out_counter = OutCount + 1, in_counter = InCount + 1 }, MsgStatus = #msg_status { msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = true, msg_on_disk = false, index_on_disk = false }, - MsgStatus1 = maybe_write_msg_to_disk(false, MsgStatus), + {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(false, MsgStatus, MSCState), + State2 = State1 #vqstate { msg_store_clients = MSCState1 }, case MsgStatus1 #msg_status.msg_on_disk of true -> {#msg_status { index_on_disk = true }, IndexState1} = maybe_write_index_to_disk(false, MsgStatus1, IndexState), - {{ack_index_and_store, MsgId, SeqId}, - State1 #vqstate { index_state = IndexState1, + {{ack_index_and_store, MsgId, SeqId, IsPersistent}, + State2 #vqstate { index_state = IndexState1, next_seq_id = SeqId + 1 }}; false -> - {ack_not_on_disk, State1} + {ack_not_on_disk, State2} end. set_queue_ram_duration_target( @@ -404,9 +408,9 @@ fetch(State = case IndexOnDisk1 of true -> true = IsPersistent, %% ASSERTION true = MsgOnDisk, %% ASSERTION - {ack_index_and_store, MsgId, SeqId}; + {ack_index_and_store, MsgId, SeqId, IsPersistent}; false -> ok = case MsgOnDisk andalso not IsPersistent of - true -> rabbit_msg_store:remove([MsgId]); + true -> rabbit_msg_store:remove(find_msg_store(IsPersistent), [MsgId]); false -> ok end, ack_not_on_disk @@ -419,19 +423,25 @@ fetch(State = end. ack(AckTags, State = #vqstate { index_state = IndexState }) -> - {MsgIds, SeqIds} = + {MsgIdsPersistent, MsgIdsTransient, SeqIds} = lists:foldl( fun (ack_not_on_disk, Acc) -> Acc; - ({ack_index_and_store, MsgId, SeqId}, {MsgIds, SeqIds}) -> - {[MsgId | MsgIds], [SeqId | SeqIds]} - end, {[], []}, AckTags), + ({ack_index_and_store, MsgId, SeqId, true}, {MsgIdsP, MsgIdsT, SeqIds}) -> + {[MsgId | MsgIdsP], MsgIdsT, [SeqId | SeqIds]}; + ({ack_index_and_store, MsgId, SeqId, false}, {MsgIdsP, MsgIdsT, SeqIds}) -> + {MsgIdsP, [MsgId | MsgIdsT], [SeqId | SeqIds]} + end, {[], [], []}, AckTags), IndexState1 = case SeqIds of [] -> IndexState; _ -> rabbit_queue_index:write_acks(SeqIds, IndexState) end, - ok = case MsgIds of + ok = case MsgIdsPersistent of [] -> ok; - _ -> rabbit_msg_store:remove(MsgIds) + _ -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, MsgIdsPersistent) + end, + ok = case MsgIdsTransient of + [] -> ok; + _ -> rabbit_msg_store:remove(?TRANSIENT_MSG_STORE, MsgIdsTransient) end, State #vqstate { index_state = IndexState1 }. @@ -453,7 +463,7 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> %% needs to delete everything that's been delivered and not ack'd. delete_and_terminate(State) -> {_PurgeCount, State1 = #vqstate { index_state = IndexState, - msg_store_read_state = MSCState }} = + msg_store_clients = {MSCStateP, MSCStateT} }} = purge(State), IndexState1 = case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id( @@ -466,7 +476,8 @@ delete_and_terminate(State) -> IndexState3 end, IndexState4 = rabbit_queue_index:terminate_and_erase(IndexState1), - rabbit_msg_store:client_terminate(MSCState), + rabbit_msg_store:client_terminate(MSCStateP), + rabbit_msg_store:client_terminate(MSCStateT), State1 #vqstate { index_state = IndexState4 }. %% [{Msg, AckTag}] @@ -479,45 +490,52 @@ delete_and_terminate(State) -> %% msg_store:release so that the cache isn't held full of msgs which %% are now at the tail of the queue. requeue(MsgsWithAckTags, State) -> - {SeqIds, MsgIds, State1 = #vqstate { index_state = IndexState }} = + {SeqIds, MsgIdsPersistent, MsgIdsTransient, + State1 = #vqstate { index_state = IndexState }} = lists:foldl( fun ({Msg = #basic_message { guid = MsgId }, AckTag}, - {SeqIdsAcc, MsgIdsAcc, StateN}) -> - {SeqIdsAcc1, MsgIdsAcc1, MsgOnDisk} = + {SeqIdsAcc, MsgIdsP, MsgIdsT, StateN}) -> + {SeqIdsAcc1, MsgIdsP1, MsgIdsT1, MsgOnDisk} = case AckTag of ack_not_on_disk -> - {SeqIdsAcc, MsgIdsAcc, false}; - {ack_index_and_store, MsgId, SeqId} -> - {[SeqId | SeqIdsAcc], [MsgId | MsgIdsAcc], true} + {SeqIdsAcc, MsgIdsP, MsgIdsT, false}; + {ack_index_and_store, MsgId, SeqId, true} -> + {[SeqId | SeqIdsAcc], [MsgId | MsgIdsP], MsgIdsT, true}; + {ack_index_and_store, MsgId, SeqId, false} -> + {[SeqId | SeqIdsAcc], MsgIdsP, [MsgId | MsgIdsT], true} end, {_SeqId, StateN1} = publish(Msg, true, MsgOnDisk, StateN), - {SeqIdsAcc1, MsgIdsAcc1, StateN1} - end, {[], [], State}, MsgsWithAckTags), + {SeqIdsAcc1, MsgIdsP1, MsgIdsT1, StateN1} + end, {[], [], [], State}, MsgsWithAckTags), IndexState1 = case SeqIds of [] -> IndexState; _ -> rabbit_queue_index:write_acks(SeqIds, IndexState) end, - ok = case MsgIds of + ok = case MsgIdsPersistent of + [] -> ok; + _ -> rabbit_msg_store:release(?PERSISTENT_MSG_STORE, MsgIdsPersistent) + end, + ok = case MsgIdsTransient of [] -> ok; - _ -> rabbit_msg_store:release(MsgIds) + _ -> rabbit_msg_store:release(?TRANSIENT_MSG_STORE, MsgIdsTransient) end, State1 #vqstate { index_state = IndexState1 }. tx_publish(Msg = #basic_message { is_persistent = true, guid = MsgId }, - State) -> + State = #vqstate { msg_store_clients = MSCState }) -> MsgStatus = #msg_status { msg = Msg, msg_id = MsgId, seq_id = undefined, is_persistent = true, is_delivered = false, msg_on_disk = false, index_on_disk = false }, - #msg_status { msg_on_disk = true } = - maybe_write_msg_to_disk(false, MsgStatus), - State; + {#msg_status { msg_on_disk = true }, MSCState1} = + maybe_write_msg_to_disk(false, MsgStatus, MSCState), + State #vqstate { msg_store_clients = MSCState1 }; tx_publish(_Msg, State) -> State. tx_rollback(Pubs, State) -> ok = case persistent_msg_ids(Pubs) of [] -> ok; - PP -> rabbit_msg_store:remove(PP) + PP -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, PP) end, State. @@ -528,6 +546,7 @@ tx_commit(Pubs, AckTags, From, State) -> PersistentMsgIds -> Self = self(), ok = rabbit_msg_store:sync( + ?PERSISTENT_MSG_STORE, PersistentMsgIds, fun () -> ok = rabbit_amqqueue:tx_commit_msg_store_callback( Self, Pubs, AckTags, From) @@ -712,11 +731,15 @@ purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) -> end. remove_queue_entries(Fold, Q, IndexState) -> - {Count, MsgIds, SeqIds, IndexState1} = - Fold(fun remove_queue_entries1/2, {0, [], [], IndexState}, Q), - ok = case MsgIds of + {Count, MsgIdsPersistent, MsgIdsTransient, SeqIds, IndexState1} = + Fold(fun remove_queue_entries1/2, {0, [], [], [], IndexState}, Q), + ok = case MsgIdsPersistent of + [] -> ok; + _ -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, MsgIdsPersistent) + end, + ok = case MsgIdsTransient of [] -> ok; - _ -> rabbit_msg_store:remove(MsgIds) + _ -> rabbit_msg_store:remove(?TRANSIENT_MSG_STORE, MsgIdsTransient) end, IndexState2 = case SeqIds of @@ -728,12 +751,14 @@ remove_queue_entries(Fold, Q, IndexState) -> remove_queue_entries1( #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, - index_on_disk = IndexOnDisk }, - {CountN, MsgIdsAcc, SeqIdsAcc, IndexStateN}) -> - MsgIdsAcc1 = case MsgOnDisk of - true -> [MsgId | MsgIdsAcc]; - false -> MsgIdsAcc - end, + index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, + {CountN, MsgIdsP, MsgIdsT, SeqIdsAcc, IndexStateN}) -> + {MsgIdsP1, MsgIdsT1} = + case {MsgOnDisk, IsPersistent} of + {true, true} -> {[MsgId | MsgIdsP], MsgIdsT}; + {true, false} -> {MsgIdsP, [MsgId | MsgIdsT]}; + {false, _} -> {MsgIdsP, MsgIdsT} + end, SeqIdsAcc1 = case IndexOnDisk of true -> [SeqId | SeqIdsAcc]; false -> SeqIdsAcc @@ -743,13 +768,13 @@ remove_queue_entries1( SeqId, IndexStateN); false -> IndexStateN end, - {CountN + 1, MsgIdsAcc1, SeqIdsAcc1, IndexStateN1}. + {CountN + 1, MsgIdsP1, MsgIdsT1, SeqIdsAcc1, IndexStateN1}. fetch_from_q3_or_delta(State = #vqstate { q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, - msg_store_read_state = MSCState }) -> + msg_store_clients = MSCState }) -> case bpqueue:out(Q3) of {empty, _Q3} -> 0 = DeltaCount, %% ASSERTION @@ -761,7 +786,7 @@ fetch_from_q3_or_delta(State = #vqstate { is_persistent = IsPersistent }}, Q3a} -> {{ok, Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }}, MSCState1} = - rabbit_msg_store:read(MsgId, MSCState), + read_from_msg_store(MSCState, MsgId, IsPersistent), Q4a = queue:in(MsgStatus #msg_status { msg = Msg }, Q4), RamIndexCount1 = case IndexOnDisk of true -> RamIndexCount; @@ -771,7 +796,7 @@ fetch_from_q3_or_delta(State = #vqstate { State1 = State #vqstate { q3 = Q3a, q4 = Q4a, ram_msg_count = RamMsgCount + 1, ram_index_count = RamIndexCount1, - msg_store_read_state = MSCState1 }, + msg_store_clients = MSCState1 }, State2 = case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of {true, true} -> @@ -857,19 +882,22 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }, in_counter = InCount + 1 })}. publish(msg, MsgStatus, State = #vqstate { index_state = IndexState, - ram_msg_count = RamMsgCount }) -> - MsgStatus1 = maybe_write_msg_to_disk(false, MsgStatus), + ram_msg_count = RamMsgCount, + msg_store_clients = MSCState }) -> + {MsgStatus1, MSCState1} = + maybe_write_msg_to_disk(false, MsgStatus, MSCState), {MsgStatus2, IndexState1} = maybe_write_index_to_disk(false, MsgStatus1, IndexState), State1 = State #vqstate { ram_msg_count = RamMsgCount + 1, - index_state = IndexState1 }, + index_state = IndexState1, + msg_store_clients = MSCState1 }, store_alpha_entry(MsgStatus2, State1); -publish(index, MsgStatus, State = - #vqstate { index_state = IndexState, q1 = Q1, - ram_index_count = RamIndexCount }) -> - MsgStatus1 = #msg_status { msg_on_disk = true } = - maybe_write_msg_to_disk(true, MsgStatus), +publish(index, MsgStatus, State = #vqstate { index_state = IndexState, q1 = Q1, + ram_index_count = RamIndexCount, + msg_store_clients = MSCState }) -> + {MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} = + maybe_write_msg_to_disk(true, MsgStatus, MSCState), ForceIndex = should_force_index_to_disk(State), {MsgStatus2, IndexState1} = maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), @@ -878,15 +906,16 @@ publish(index, MsgStatus, State = false -> RamIndexCount + 1 end, State1 = State #vqstate { index_state = IndexState1, - ram_index_count = RamIndexCount1 }, + ram_index_count = RamIndexCount1, + msg_store_clients = MSCState1 }, true = queue:is_empty(Q1), %% ASSERTION store_beta_entry(MsgStatus2, State1); publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State = - #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2, - delta = Delta }) -> - MsgStatus1 = #msg_status { msg_on_disk = true } = - maybe_write_msg_to_disk(true, MsgStatus), + #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2, + delta = Delta, msg_store_clients = MSCState }) -> + {MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} = + maybe_write_msg_to_disk(true, MsgStatus, MSCState), {#msg_status { index_on_disk = true }, IndexState1} = maybe_write_index_to_disk(true, MsgStatus1, IndexState), true = queue:is_empty(Q1) andalso bpqueue:is_empty(Q2), %% ASSERTION @@ -898,7 +927,8 @@ publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State = Delta1 = #delta { start_seq_id = DeltaSeqId, count = 1, end_seq_id = SeqId + 1 }, State #vqstate { index_state = IndexState1, - delta = combine_deltas(Delta, Delta1) }. + delta = combine_deltas(Delta, Delta1), + msg_store_clients = MSCState1 }. store_alpha_entry(MsgStatus, State = #vqstate { q1 = Q1, q2 = Q2, @@ -925,17 +955,42 @@ store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true, State #vqstate { q2 = bpqueue:in(IndexOnDisk, MsgStatus1, Q2) } end. +find_msg_store(true) -> ?PERSISTENT_MSG_STORE; +find_msg_store(false) -> ?TRANSIENT_MSG_STORE. + +read_from_msg_store({MSCStateP, MSCStateT}, MsgId, true) -> + {Res, MSCStateP1} = + rabbit_msg_store:read(?PERSISTENT_MSG_STORE, MsgId, MSCStateP), + {Res, {MSCStateP1, MSCStateT}}; +read_from_msg_store({MSCStateP, MSCStateT}, MsgId, false) -> + {Res, MSCStateT1} = + rabbit_msg_store:read(?TRANSIENT_MSG_STORE, MsgId, MSCStateT), + {Res, {MSCStateP, MSCStateT1}}. + maybe_write_msg_to_disk(_Force, MsgStatus = - #msg_status { msg_on_disk = true }) -> - MsgStatus; + #msg_status { msg_on_disk = true }, MSCState) -> + {MsgStatus, MSCState}; maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { msg = Msg, msg_id = MsgId, - is_persistent = IsPersistent }) + is_persistent = IsPersistent }, + {MSCStateP, MSCStateT}) when Force orelse IsPersistent -> - ok = rabbit_msg_store:write(MsgId, ensure_binary_properties(Msg)), - MsgStatus #msg_status { msg_on_disk = true }; -maybe_write_msg_to_disk(_Force, MsgStatus) -> - MsgStatus. + MSCState1 = + case IsPersistent of + true -> + {ok, MSCStateP1} = rabbit_msg_store:write( + ?PERSISTENT_MSG_STORE, MsgId, + ensure_binary_properties(Msg), MSCStateP), + {MSCStateP1, MSCStateT}; + false -> + {ok, MSCStateT1} = rabbit_msg_store:write( + ?TRANSIENT_MSG_STORE, MsgId, + ensure_binary_properties(Msg), MSCStateT), + {MSCStateP, MSCStateT1} + end, + {MsgStatus #msg_status { msg_on_disk = true }, MSCState1}; +maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) -> + {MsgStatus, MSCState}. maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { index_on_disk = true }, IndexState) -> @@ -1082,11 +1137,12 @@ maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, State = maybe_push_alphas_to_betas( Generator, Consumer, Q, State = #vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, - index_state = IndexState }) -> + index_state = IndexState, msg_store_clients = MSCState }) -> case Generator(Q) of {empty, _Q} -> State; {{value, MsgStatus}, Qa} -> - MsgStatus1 = maybe_write_msg_to_disk(true, MsgStatus), + {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(true, MsgStatus, + MSCState), ForceIndex = should_force_index_to_disk(State), {MsgStatus2, IndexState1} = maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), @@ -1096,7 +1152,8 @@ maybe_push_alphas_to_betas( end, State1 = State #vqstate { ram_msg_count = RamMsgCount - 1, ram_index_count = RamIndexCount1, - index_state = IndexState1 }, + index_state = IndexState1, + msg_store_clients = MSCState1 }, maybe_push_alphas_to_betas(Generator, Consumer, Qa, Consumer(MsgStatus2, Qa, State1)) end. |
