summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl108
-rw-r--r--src/rabbit_variable_queue.erl1
2 files changed, 58 insertions, 51 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 1d53787a6d..b2768a1337 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -36,7 +36,7 @@
-export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
sync/3, client_init/2, client_terminate/1,
client_delete_and_terminate/3, successfully_recovered_state/1,
- register_sync_callback/2]).
+ register_sync_callback/3]).
-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
@@ -84,8 +84,8 @@
client_refs, %% set of references of all registered clients
successfully_recovered, %% boolean: did we recover state?
file_size_limit, %% how big are our files allowed to get?
- pid_to_fun, %% pid to callback function mapping
- pid_to_guids %% pid to synced messages mapping
+ client_ondisk_callback, %% client ref to callback function mapping
+ cref_to_guids %% client ref to synced messages mapping
}).
-record(client_msstate,
@@ -97,7 +97,8 @@
file_handles_ets,
file_summary_ets,
dedup_cache_ets,
- cur_file_cache_ets
+ cur_file_cache_ets,
+ client_ref
}).
-record(file_summary,
@@ -119,7 +120,8 @@
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
dedup_cache_ets :: ets:tid(),
- cur_file_cache_ets :: ets:tid() }).
+ cur_file_cache_ets :: ets:tid(),
+ client_ref :: rabbit_guid:guid()}).
-type(startup_fun_state() ::
{(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
A}).
@@ -127,8 +129,9 @@
-spec(start_link/4 ::
(atom(), file:filename(), [binary()] | 'undefined',
startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
--spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) ->
- rabbit_types:ok(client_msstate())).
+-spec(write/4 :: (server(), rabbit_guid:guid(),
+ msg(), client_msstate())
+ -> rabbit_types:ok(client_msstate())).
-spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) ->
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
-spec(contains/2 :: (server(), rabbit_guid:guid()) -> boolean()).
@@ -138,10 +141,10 @@
-spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) ->
'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
--spec(client_init/2 :: (server(), binary()) -> client_msstate()).
+-spec(client_init/2 :: (server(), rabbit_guid:guid()) -> client_msstate()).
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_delete_and_terminate/3 ::
- (client_msstate(), server(), binary()) -> 'ok').
+ (client_msstate(), server(), rabbit_guid:guid()) -> 'ok').
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
-spec(gc/3 :: (non_neg_integer(), non_neg_integer(),
@@ -318,9 +321,10 @@ start_link(Server, Dir, ClientRefs, StartupFunState) ->
[{timeout, infinity}]).
write(Server, Guid, Msg,
- CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
+ client_ref = CRef }) ->
ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
- {gen_server2:cast(Server, {write, self(), Guid}), CState}.
+ {gen_server2:cast(Server, {write, CRef, Guid}), CState}.
read(Server, Guid,
CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
@@ -374,12 +378,13 @@ client_init(Server, Ref) ->
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts }.
+ cur_file_cache_ets = CurFileCacheEts,
+ client_ref = Ref}.
client_terminate(CState) ->
Self = self(),
spawn(fun() ->
- gen_server2:call(self(), {client_terminate, CState})
+ gen_server2:call(Self, {client_terminate, CState})
end),
ok.
@@ -390,8 +395,8 @@ client_delete_and_terminate(CState, Server, Ref) ->
successfully_recovered_state(Server) ->
gen_server2:pcall(Server, 7, successfully_recovered_state, infinity).
-register_sync_callback(Server, Fun) ->
- gen_server2:call(Server, {register_sync_callback, Fun}, infinity).
+register_sync_callback(Server, ClientRef, Fun) ->
+ gen_server2:call(Server, {register_sync_callback, ClientRef, Fun}, infinity).
%%----------------------------------------------------------------------------
%% Client-side-only helpers
@@ -569,8 +574,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
client_refs = ClientRefs1,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
- pid_to_fun = dict:new(),
- pid_to_guids = dict:new()
+ client_ondisk_callback = dict:new(),
+ cref_to_guids = dict:new()
},
%% If we didn't recover the msg location index then we need to
@@ -617,28 +622,29 @@ handle_call({new_client_state, CRef}, _From,
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
-handle_call({register_sync_callback, Fun}, {Pid, _},
- State = #msstate { pid_to_fun = PTF }) ->
- reply(ok,
- State #msstate { pid_to_fun = dict:store(Pid, Fun, PTF) });
-
-handle_call({client_terminate, CState}, {Pid, _},
- State = #msstate { pid_to_fun = PTF,
- pid_to_guids = PTG }) ->
+handle_call({register_sync_callback, ClientRef, Fun}, _From,
+ State = #msstate { client_ondisk_callback = CODC }) ->
+ reply(ok, State #msstate { client_ondisk_callback =
+ dict:store(ClientRef, Fun, CODC) });
+handle_call({client_terminate, CState = #client_msstate { client_ref = CRef }},
+ _From,
+ State = #msstate { client_ondisk_callback = CODC,
+ cref_to_guids = CTG }) ->
ok = close_all_handles(CState),
reply(ok,
- State #msstate { pid_to_fun = dict:erase(Pid, PTF),
- pid_to_guids = dict:erase(Pid, PTG) }).
-
-handle_cast({write, Pid, Guid},
- State = #msstate { current_file_handle = CurHdl,
- current_file = CurFile,
- sum_valid_data = SumValid,
- sum_file_size = SumFileSize,
- file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts,
- pid_to_fun = PTF,
- pid_to_guids = PTG}) ->
+ State #msstate { client_ondisk_callback = dict:erase(CRef, CODC),
+ cref_to_guids = dict:erase(CRef, CTG) }).
+
+handle_cast({write, CRef, Guid},
+ State = #msstate { current_file_handle = CurHdl,
+ current_file = CurFile,
+ sum_valid_data = SumValid,
+ sum_file_size = SumFileSize,
+ file_summary_ets = FileSummaryEts,
+ cur_file_cache_ets = CurFileCacheEts,
+ client_ondisk_callback = CODC,
+ cref_to_guids = CTG}) ->
+
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
case index_lookup(Guid, State) of
@@ -672,10 +678,10 @@ handle_cast({write, Pid, Guid},
NextOffset, State #msstate {
sum_valid_data = SumValid + TotalSize,
sum_file_size = SumFileSize + TotalSize,
- pid_to_guids =
- case dict:find(Pid, PTF) of
- {ok, _} -> rabbit_misc:dict_cons(Pid, Guid, PTG);
- error -> PTG
+ cref_to_guids =
+ case dict:find(CRef, CODC) of
+ {ok, _} -> rabbit_misc:dict_cons(CRef, Guid, CTG);
+ error -> CRef
end}));
#msg_location { ref_count = RefCount } ->
%% We already know about it, just update counter. Only
@@ -809,14 +815,14 @@ reply(Reply, State) ->
next_state(State = #msstate { sync_timer_ref = undefined,
on_sync = OS,
- pid_to_guids = PTG }) ->
- case {OS, dict:size(PTG)} of
+ cref_to_guids = CTG }) ->
+ case {OS, dict:size(CTG)} of
{[], 0} -> {State, hibernate};
_ -> {start_sync_timer(State), 0}
end;
next_state(State = #msstate { on_sync = OS,
- pid_to_guids = PTG }) ->
- case {OS, dict:size(PTG)} of
+ cref_to_guids = CTG }) ->
+ case {OS, dict:size(CTG)} of
{[], 0} -> {stop_sync_timer(State), hibernate};
_ -> {State, 0}
end.
@@ -833,8 +839,8 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
internal_sync(State = #msstate { current_file_handle = CurHdl,
on_sync = Syncs,
- pid_to_fun = PTF,
- pid_to_guids = PTG }) ->
+ client_ondisk_callback = CODC,
+ cref_to_guids = CTG }) ->
State1 = stop_sync_timer(State),
State2 = case Syncs of
[] -> State1;
@@ -842,13 +848,13 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
State1 #msstate { on_sync = [] }
end,
- dict:map(fun(Pid, Guids) ->
- case dict:find(Pid, PTF) of
+ dict:map(fun(CRef, Guids) ->
+ case dict:find(CRef, CODC) of
{ok, Fun} -> Fun(Guids);
error -> ok %% shouldn't happen
end
- end, PTG),
- State2 #msstate { pid_to_guids = dict:new() }.
+ end, CTG),
+ State2 #msstate { cref_to_guids = dict:new() }.
read_message(Guid, From,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9e7b1d2c7b..62bd38f2d8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -409,6 +409,7 @@ init(QueueName, IsDurable, Recover) ->
Self = self(),
rabbit_msg_store:register_sync_callback(
?PERSISTENT_MSG_STORE,
+ PRef,
fun (Guids) ->
msgs_written_to_disk(Self, Guids)
end),