summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl32
-rw-r--r--src/rabbit_msg_store_vhost_sup.erl77
-rw-r--r--test/channel_operation_timeout_test_queue.erl2
3 files changed, 65 insertions, 46 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index b0beb975e8..1e929c7f7c 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -474,15 +474,15 @@
%% public API
%%----------------------------------------------------------------------------
-start_link(Server, Dir, ClientRefs, StartupFunState) ->
- gen_server2:start_link({local, Server}, ?MODULE,
- [Server, Dir, ClientRefs, StartupFunState],
+start_link(Name, Dir, ClientRefs, StartupFunState) when is_atom(Name) ->
+ gen_server2:start_link(?MODULE,
+ [Name, Dir, ClientRefs, StartupFunState],
[{timeout, infinity}]).
successfully_recovered_state(Server) ->
gen_server2:call(Server, successfully_recovered_state, infinity).
-client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
+client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} =
gen_server2:call(
@@ -522,7 +522,7 @@ write_flow(MsgId, Msg,
%% rabbit_amqqueue_process process via the
%% rabbit_variable_queue. We are accessing the
%% rabbit_amqqueue_process process dictionary.
- credit_flow:send(whereis(Server), CreditDiscBound),
+ credit_flow:send(Server, CreditDiscBound),
client_write(MsgId, Msg, flow, CState).
write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
@@ -548,7 +548,7 @@ remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
[client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds],
server_cast(CState, {remove, CRef, MsgIds}).
-set_maximum_since_use(Server, Age) ->
+set_maximum_since_use(Server, Age) when is_pid(Server) ->
gen_server2:cast(Server, {set_maximum_since_use, Age}).
%%----------------------------------------------------------------------------
@@ -710,16 +710,16 @@ clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([Server, BaseDir, ClientRefs, StartupFunState]) ->
+init([Name, BaseDir, ClientRefs, StartupFunState]) ->
process_flag(trap_exit, true),
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
- Dir = filename:join(BaseDir, atom_to_list(Server)),
+ Dir = filename:join(BaseDir, atom_to_list(Name)),
{ok, IndexModule} = application:get_env(rabbit, msg_store_index_module),
- rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]),
+ rabbit_log:info("~tp: using ~p to provide index~n", [Dir, IndexModule]),
AttemptFileSummaryRecovery =
case ClientRefs of
@@ -738,7 +738,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
{CleanShutdown, IndexState, ClientRefs1} =
recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
- ClientRefs, Dir, Server),
+ ClientRefs, Dir),
Clients = dict:from_list(
[{CRef, {undefined, undefined, undefined}} ||
CRef <- ClientRefs1]),
@@ -1551,16 +1551,16 @@ index_delete_by_file(File, #msstate { index_module = Index,
%% shutdown and recovery
%%----------------------------------------------------------------------------
-recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) ->
+recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir) ->
{false, IndexModule:new(Dir), []};
-recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) ->
- rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]),
+recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir) ->
+ rabbit_log:warning("~tp : rebuilding indices from scratch~n", [Dir]),
{false, IndexModule:new(Dir), []};
-recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
+recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir) ->
Fresh = fun (ErrorMsg, ErrorArgs) ->
- rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n"
+ rabbit_log:warning("~tp : " ++ ErrorMsg ++ "~n"
"rebuilding indices from scratch~n",
- [Server | ErrorArgs]),
+ [Dir | ErrorArgs]),
{false, IndexModule:new(Dir), []}
end,
case read_recovery_terms(Dir) of
diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl
index a5be431093..5784f199ca 100644
--- a/src/rabbit_msg_store_vhost_sup.erl
+++ b/src/rabbit_msg_store_vhost_sup.erl
@@ -13,6 +13,7 @@ start_link(Name, ClientRefs, StartupFunState) ->
[Name, ClientRefs, StartupFunState]).
init([Name, ClientRefs, StartupFunState]) ->
+ ets:new(Name, [named_table, public]),
{ok, {{simple_one_for_one, 1, 1},
[{rabbit_msg_store_vhost, {rabbit_msg_store_vhost_sup, start_vhost,
[Name, ClientRefs, StartupFunState]},
@@ -23,37 +24,54 @@ add_vhost(Name, VHost) ->
supervisor2:start_child(Name, [VHost]).
start_vhost(Name, ClientRefs, StartupFunState, VHost) ->
- VHostName = vhost_store_name(Name, VHost),
- VHostDir = vhost_store_dir(VHost),
- ok = rabbit_file:ensure_dir(VHostDir),
- rabbit_msg_store:start_link(VHostName, VHostDir,
- ClientRefs, StartupFunState).
+ case vhost_store_pid(Name, VHost) of
+ no_pid ->
+ VHostDir = vhost_store_dir(VHost),
+ ok = rabbit_file:ensure_dir(VHostDir),
+ case rabbit_msg_store:start_link(Name, VHostDir, ClientRefs, StartupFunState) of
+ {ok, Pid} ->
+ ets:insert(Name, {VHost, Pid}),
+ {ok, Pid};
+ Other -> Other
+ end;
+ Pid when is_pid(Pid) ->
+ {error, {already_started, Pid}}
+ end.
delete_vhost(Name, VHost) ->
- VHostName = vhost_store_name(Name, VHost),
- case whereis(VHostName) of
- undefined -> ok;
- Pid -> supervisor2:terminate_child(Name, Pid)
+ case vhost_store_pid(Name, VHost) of
+ no_pid -> ok;
+ Pid when is_pid(Pid) ->
+ supervisor2:terminate_child(Name, Pid),
+ cleanup_vhost_store(Name, VHost, Pid)
end,
ok.
-client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun, VHost) ->
- VHostName = maybe_start_vhost(Server, VHost),
- rabbit_msg_store:client_init(VHostName, Ref, MsgOnDiskFun, CloseFDsFun).
+client_init(Name, Ref, MsgOnDiskFun, CloseFDsFun, VHost) ->
+ VHostPid = maybe_start_vhost(Name, VHost),
+ rabbit_msg_store:client_init(VHostPid, Ref, MsgOnDiskFun, CloseFDsFun).
-maybe_start_vhost(Server, VHost) ->
- VHostName = vhost_store_name(Server, VHost),
- case whereis(VHostName) of
- undefined -> add_vhost(Server, VHost);
- _ -> ok
- end,
- VHostName.
+maybe_start_vhost(Name, VHost) ->
+ case add_vhost(Name, VHost) of
+ {ok, Pid} -> Pid;
+ {error, {already_started, Pid}} -> Pid;
+ Error -> throw(Error)
+ end.
-vhost_store_name(Name, VHost) ->
- Base64EncodedName = rabbit_vhost:dir(VHost),
- binary_to_atom(<<(atom_to_binary(Name, utf8))/binary, "_",
- Base64EncodedName/binary>>,
- utf8).
+vhost_store_pid(Name, VHost) ->
+ case ets:lookup(Name, VHost) of
+ [] -> no_pid;
+ [Pid] ->
+ case erlang:is_process_alive(Pid) of
+ true -> Pid;
+ false ->
+ cleanup_vhost_store(Name, VHost, Pid),
+ no_pid
+ end
+ end.
+
+cleanup_vhost_store(Name, VHost, Pid) ->
+ ets:delete_object(Name, {VHost, Pid}).
vhost_store_dir(VHost) ->
Dir = rabbit_mnesia:dir(),
@@ -61,8 +79,9 @@ vhost_store_dir(VHost) ->
binary_to_list(filename:join([Dir, Base64EncodedName])).
successfully_recovered_state(Name, VHost) ->
- VHostName = vhost_store_name(Name, VHost),
- rabbit_msg_store:successfully_recovered_state(VHostName).
-
-% force_recovery
-% transform_dir
+ case vhost_store_pid(Name, VHost) of
+ no_pid ->
+ throw({message_store_not_started, Name, VHost});
+ Pid when is_pid(Pid) ->
+ rabbit_msg_store:successfully_recovered_state(Pid)
+ end.
diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl
index 07b1235672..5e256b1381 100644
--- a/test/channel_operation_timeout_test_queue.erl
+++ b/test/channel_operation_timeout_test_queue.erl
@@ -296,7 +296,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
{DeltaCount, DeltaBytes, IndexState} =
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
- rabbit_msg_store_vhost_sup:successfully_recovered_state(?PERSISTENT_MSG_STORE),
+ rabbit_msg_store_vhost_sup:successfully_recovered_state(?PERSISTENT_MSG_STORE, VHost),
ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
PersistentClient, TransientClient).