summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2016-03-10 13:29:08 +0000
committerDaniil Fedotov <dfedotov@pivotal.io>2016-10-20 14:49:52 +0100
commitabaa77062e27688f984ba8cb914ca69a9cb5e2e6 (patch)
treed2f9d7ac5873f18fca545da0fbfdcf98cd8275ea
parent3ad0b674caaa7a898f525933590acfd4f0d99d3c (diff)
downloadrabbitmq-server-git-abaa77062e27688f984ba8cb914ca69a9cb5e2e6.tar.gz
message store supervisor
-rw-r--r--src/rabbit_msg_store_vhost_sup.erl50
-rw-r--r--src/rabbit_variable_queue.erl30
2 files changed, 69 insertions, 11 deletions
diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl
new file mode 100644
index 0000000000..299efdc428
--- /dev/null
+++ b/src/rabbit_msg_store_vhost_sup.erl
@@ -0,0 +1,50 @@
+-module(rabbit_msg_store_vhost_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/4, init/1, add_vhost/2, client_init/5, start_vhost/5]).
+
+start_link(Name, Dir, ClientRefs, StartupFunState) ->
+ supervisor2:start_link({local, Name}, ?MODULE,
+ [Name, Dir, ClientRefs, StartupFunState]).
+
+init([Name, Dir, ClientRefs, StartupFunState]) ->
+ {ok, {{simple_one_for_one, 0, 1},
+ [{rabbit_msg_store_vhost, {rabbit_msg_store_vhost_sup, start_vhost,
+ [Name, Dir, ClientRefs, StartupFunState]},
+ transient, infinity, supervisor, [rabbit_msg_store]}]}}.
+
+
+add_vhost(Name, VHost) ->
+ supervisor2:start_child(Name, [VHost]).
+
+start_vhost(Name, Dir, ClientRefs, StartupFunState, VHost) ->
+ VHostName = get_vhost_name(Name, VHost),
+ VHostDir = get_vhost_dir(Dir, VHost),
+ ok = rabbit_file:ensure_dir(VHostDir),
+ io:format("Store dir ~p~n", [VHostDir]),
+ rabbit_msg_store:start_link(VHostName, VHostDir,
+ ClientRefs, StartupFunState).
+
+
+client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun, VHost) ->
+ VHostName = maybe_start_vhost(Server, VHost),
+ rabbit_msg_store:client_init(VHostName, Ref, MsgOnDiskFun, CloseFDsFun).
+
+maybe_start_vhost(Server, VHost) ->
+ VHostName = get_vhost_name(Server, VHost),
+ Trace = try throw(42) catch 42 -> erlang:get_stacktrace() end,
+ io:format("Search for ~p~n ~p~n", [VHostName, Trace]),
+ case whereis(VHostName) of
+ undefined -> add_vhost(Server, VHost);
+ _ -> ok
+ end,
+ VHostName.
+
+get_vhost_name(Name, VHost) ->
+ binary_to_atom(<<(atom_to_binary(Name, utf8))/binary, VHost/binary>>, utf8).
+
+get_vhost_dir(Dir, VHost) ->
+ VhostEncoded = http_uri:encode(binary_to_list(VHost)),
+ filename:join([Dir, VhostEncoded]).
+
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 03381be311..6c97e85bc8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -468,10 +468,10 @@ stop() ->
ok = rabbit_queue_index:stop().
start_msg_store(Refs, StartFunState) ->
- ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
+ ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup,
[?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
undefined, {fun (ok) -> finished end, ok}]),
- ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store,
+ ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup,
[?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
Refs, StartFunState]).
@@ -492,22 +492,26 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName,
MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
+ VHost = QueueName#resource.virtual_host,
init(IsDurable, IndexState, 0, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
- MsgOnDiskFun, AsyncCallback);
+ MsgOnDiskFun, AsyncCallback, VHost);
false -> undefined
end,
- msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
+ msg_store_client_init(?TRANSIENT_MSG_STORE, undefined,
+ AsyncCallback, VHost));
%% We can be recovering a transient queue if it crashed
init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
{PRef, RecoveryTerms} = process_recovery_terms(Terms),
+ VHost = QueueName#resource.virtual_host,
{PersistentClient, ContainsCheckFun} =
case IsDurable of
true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
- MsgOnDiskFun, AsyncCallback),
+ MsgOnDiskFun, AsyncCallback,
+ VHost),
{C, fun (MsgId) when is_binary(MsgId) ->
rabbit_msg_store:contains(MsgId, C);
(#basic_message{is_persistent = Persistent}) ->
@@ -516,7 +520,8 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
false -> {undefined, fun(_MsgId) -> false end}
end,
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
- undefined, AsyncCallback),
+ undefined, AsyncCallback,
+ VHost),
{DeltaCount, DeltaBytes, IndexState} =
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
@@ -1188,14 +1193,17 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
end),
Res.
-msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
+msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
- Callback).
+ Callback, VHost).
-msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
+msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
- rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun,
- fun () -> Callback(?MODULE, CloseFDsFun) end).
+ rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, MsgOnDiskFun,
+ fun () ->
+ Callback(?MODULE, CloseFDsFun)
+ end,
+ VHost).
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(