diff options
| -rw-r--r-- | src/rabbit_msg_store.erl | 108 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 1 |
2 files changed, 58 insertions, 51 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 1d53787a6d..b2768a1337 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -36,7 +36,7 @@ -export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, sync/3, client_init/2, client_terminate/1, client_delete_and_terminate/3, successfully_recovered_state/1, - register_sync_callback/2]). + register_sync_callback/3]). -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal @@ -84,8 +84,8 @@ client_refs, %% set of references of all registered clients successfully_recovered, %% boolean: did we recover state? file_size_limit, %% how big are our files allowed to get? - pid_to_fun, %% pid to callback function mapping - pid_to_guids %% pid to synced messages mapping + client_ondisk_callback, %% client ref to callback function mapping + cref_to_guids %% client ref to synced messages mapping }). -record(client_msstate, @@ -97,7 +97,8 @@ file_handles_ets, file_summary_ets, dedup_cache_ets, - cur_file_cache_ets + cur_file_cache_ets, + client_ref }). -record(file_summary, @@ -119,7 +120,8 @@ file_handles_ets :: ets:tid(), file_summary_ets :: ets:tid(), dedup_cache_ets :: ets:tid(), - cur_file_cache_ets :: ets:tid() }). + cur_file_cache_ets :: ets:tid(), + client_ref :: rabbit_guid:guid()}). -type(startup_fun_state() :: {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), A}). @@ -127,8 +129,9 @@ -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', startup_fun_state()) -> rabbit_types:ok_pid_or_error()). --spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) -> - rabbit_types:ok(client_msstate())). +-spec(write/4 :: (server(), rabbit_guid:guid(), + msg(), client_msstate()) + -> rabbit_types:ok(client_msstate())). -spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) -> {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). -spec(contains/2 :: (server(), rabbit_guid:guid()) -> boolean()). @@ -138,10 +141,10 @@ -spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). --spec(client_init/2 :: (server(), binary()) -> client_msstate()). +-spec(client_init/2 :: (server(), rabbit_guid:guid()) -> client_msstate()). -spec(client_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_delete_and_terminate/3 :: - (client_msstate(), server(), binary()) -> 'ok'). + (client_msstate(), server(), rabbit_guid:guid()) -> 'ok'). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). -spec(gc/3 :: (non_neg_integer(), non_neg_integer(), @@ -318,9 +321,10 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> [{timeout, infinity}]). write(Server, Guid, Msg, - CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> + CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, + client_ref = CRef }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - {gen_server2:cast(Server, {write, self(), Guid}), CState}. + {gen_server2:cast(Server, {write, CRef, Guid}), CState}. read(Server, Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, @@ -374,12 +378,13 @@ client_init(Server, Ref) -> file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts }. + cur_file_cache_ets = CurFileCacheEts, + client_ref = Ref}. client_terminate(CState) -> Self = self(), spawn(fun() -> - gen_server2:call(self(), {client_terminate, CState}) + gen_server2:call(Self, {client_terminate, CState}) end), ok. @@ -390,8 +395,8 @@ client_delete_and_terminate(CState, Server, Ref) -> successfully_recovered_state(Server) -> gen_server2:pcall(Server, 7, successfully_recovered_state, infinity). -register_sync_callback(Server, Fun) -> - gen_server2:call(Server, {register_sync_callback, Fun}, infinity). +register_sync_callback(Server, ClientRef, Fun) -> + gen_server2:call(Server, {register_sync_callback, ClientRef, Fun}, infinity). %%---------------------------------------------------------------------------- %% Client-side-only helpers @@ -569,8 +574,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> client_refs = ClientRefs1, successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, - pid_to_fun = dict:new(), - pid_to_guids = dict:new() + client_ondisk_callback = dict:new(), + cref_to_guids = dict:new() }, %% If we didn't recover the msg location index then we need to @@ -617,28 +622,29 @@ handle_call({new_client_state, CRef}, _From, handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); -handle_call({register_sync_callback, Fun}, {Pid, _}, - State = #msstate { pid_to_fun = PTF }) -> - reply(ok, - State #msstate { pid_to_fun = dict:store(Pid, Fun, PTF) }); - -handle_call({client_terminate, CState}, {Pid, _}, - State = #msstate { pid_to_fun = PTF, - pid_to_guids = PTG }) -> +handle_call({register_sync_callback, ClientRef, Fun}, _From, + State = #msstate { client_ondisk_callback = CODC }) -> + reply(ok, State #msstate { client_ondisk_callback = + dict:store(ClientRef, Fun, CODC) }); +handle_call({client_terminate, CState = #client_msstate { client_ref = CRef }}, + _From, + State = #msstate { client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> ok = close_all_handles(CState), reply(ok, - State #msstate { pid_to_fun = dict:erase(Pid, PTF), - pid_to_guids = dict:erase(Pid, PTG) }). - -handle_cast({write, Pid, Guid}, - State = #msstate { current_file_handle = CurHdl, - current_file = CurFile, - sum_valid_data = SumValid, - sum_file_size = SumFileSize, - file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts, - pid_to_fun = PTF, - pid_to_guids = PTG}) -> + State #msstate { client_ondisk_callback = dict:erase(CRef, CODC), + cref_to_guids = dict:erase(CRef, CTG) }). + +handle_cast({write, CRef, Guid}, + State = #msstate { current_file_handle = CurHdl, + current_file = CurFile, + sum_valid_data = SumValid, + sum_file_size = SumFileSize, + file_summary_ets = FileSummaryEts, + cur_file_cache_ets = CurFileCacheEts, + client_ondisk_callback = CODC, + cref_to_guids = CTG}) -> + true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), case index_lookup(Guid, State) of @@ -672,10 +678,10 @@ handle_cast({write, Pid, Guid}, NextOffset, State #msstate { sum_valid_data = SumValid + TotalSize, sum_file_size = SumFileSize + TotalSize, - pid_to_guids = - case dict:find(Pid, PTF) of - {ok, _} -> rabbit_misc:dict_cons(Pid, Guid, PTG); - error -> PTG + cref_to_guids = + case dict:find(CRef, CODC) of + {ok, _} -> rabbit_misc:dict_cons(CRef, Guid, CTG); + error -> CRef end})); #msg_location { ref_count = RefCount } -> %% We already know about it, just update counter. Only @@ -809,14 +815,14 @@ reply(Reply, State) -> next_state(State = #msstate { sync_timer_ref = undefined, on_sync = OS, - pid_to_guids = PTG }) -> - case {OS, dict:size(PTG)} of + cref_to_guids = CTG }) -> + case {OS, dict:size(CTG)} of {[], 0} -> {State, hibernate}; _ -> {start_sync_timer(State), 0} end; next_state(State = #msstate { on_sync = OS, - pid_to_guids = PTG }) -> - case {OS, dict:size(PTG)} of + cref_to_guids = CTG }) -> + case {OS, dict:size(CTG)} of {[], 0} -> {stop_sync_timer(State), hibernate}; _ -> {State, 0} end. @@ -833,8 +839,8 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> internal_sync(State = #msstate { current_file_handle = CurHdl, on_sync = Syncs, - pid_to_fun = PTF, - pid_to_guids = PTG }) -> + client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> State1 = stop_sync_timer(State), State2 = case Syncs of [] -> State1; @@ -842,13 +848,13 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), State1 #msstate { on_sync = [] } end, - dict:map(fun(Pid, Guids) -> - case dict:find(Pid, PTF) of + dict:map(fun(CRef, Guids) -> + case dict:find(CRef, CODC) of {ok, Fun} -> Fun(Guids); error -> ok %% shouldn't happen end - end, PTG), - State2 #msstate { pid_to_guids = dict:new() }. + end, CTG), + State2 #msstate { cref_to_guids = dict:new() }. read_message(Guid, From, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 9e7b1d2c7b..62bd38f2d8 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -409,6 +409,7 @@ init(QueueName, IsDurable, Recover) -> Self = self(), rabbit_msg_store:register_sync_callback( ?PERSISTENT_MSG_STORE, + PRef, fun (Guids) -> msgs_written_to_disk(Self, Guids) end), |
