diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2016-03-10 13:29:08 +0000 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2016-10-20 14:49:52 +0100 |
| commit | abaa77062e27688f984ba8cb914ca69a9cb5e2e6 (patch) | |
| tree | d2f9d7ac5873f18fca545da0fbfdcf98cd8275ea | |
| parent | 3ad0b674caaa7a898f525933590acfd4f0d99d3c (diff) | |
| download | rabbitmq-server-git-abaa77062e27688f984ba8cb914ca69a9cb5e2e6.tar.gz | |
message store supervisor
| -rw-r--r-- | src/rabbit_msg_store_vhost_sup.erl | 50 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 30 |
2 files changed, 69 insertions, 11 deletions
diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl new file mode 100644 index 0000000000..299efdc428 --- /dev/null +++ b/src/rabbit_msg_store_vhost_sup.erl @@ -0,0 +1,50 @@ +-module(rabbit_msg_store_vhost_sup). + +-behaviour(supervisor2). + +-export([start_link/4, init/1, add_vhost/2, client_init/5, start_vhost/5]). + +start_link(Name, Dir, ClientRefs, StartupFunState) -> + supervisor2:start_link({local, Name}, ?MODULE, + [Name, Dir, ClientRefs, StartupFunState]). + +init([Name, Dir, ClientRefs, StartupFunState]) -> + {ok, {{simple_one_for_one, 0, 1}, + [{rabbit_msg_store_vhost, {rabbit_msg_store_vhost_sup, start_vhost, + [Name, Dir, ClientRefs, StartupFunState]}, + transient, infinity, supervisor, [rabbit_msg_store]}]}}. + + +add_vhost(Name, VHost) -> + supervisor2:start_child(Name, [VHost]). + +start_vhost(Name, Dir, ClientRefs, StartupFunState, VHost) -> + VHostName = get_vhost_name(Name, VHost), + VHostDir = get_vhost_dir(Dir, VHost), + ok = rabbit_file:ensure_dir(VHostDir), + io:format("Store dir ~p~n", [VHostDir]), + rabbit_msg_store:start_link(VHostName, VHostDir, + ClientRefs, StartupFunState). + + +client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun, VHost) -> + VHostName = maybe_start_vhost(Server, VHost), + rabbit_msg_store:client_init(VHostName, Ref, MsgOnDiskFun, CloseFDsFun). + +maybe_start_vhost(Server, VHost) -> + VHostName = get_vhost_name(Server, VHost), + Trace = try throw(42) catch 42 -> erlang:get_stacktrace() end, + io:format("Search for ~p~n ~p~n", [VHostName, Trace]), + case whereis(VHostName) of + undefined -> add_vhost(Server, VHost); + _ -> ok + end, + VHostName. + +get_vhost_name(Name, VHost) -> + binary_to_atom(<<(atom_to_binary(Name, utf8))/binary, VHost/binary>>, utf8). + +get_vhost_dir(Dir, VHost) -> + VhostEncoded = http_uri:encode(binary_to_list(VHost)), + filename:join([Dir, VhostEncoded]). + diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 03381be311..6c97e85bc8 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -468,10 +468,10 @@ stop() -> ok = rabbit_queue_index:stop(). start_msg_store(Refs, StartFunState) -> - ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store, + ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup, [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined, {fun (ok) -> finished end, ok}]), - ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store, + ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup, [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), Refs, StartFunState]). @@ -492,22 +492,26 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), + VHost = QueueName#resource.virtual_host, init(IsDurable, IndexState, 0, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, - MsgOnDiskFun, AsyncCallback); + MsgOnDiskFun, AsyncCallback, VHost); false -> undefined end, - msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); + msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, + AsyncCallback, VHost)); %% We can be recovering a transient queue if it crashed init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> {PRef, RecoveryTerms} = process_recovery_terms(Terms), + VHost = QueueName#resource.virtual_host, {PersistentClient, ContainsCheckFun} = case IsDurable of true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, - MsgOnDiskFun, AsyncCallback), + MsgOnDiskFun, AsyncCallback, + VHost), {C, fun (MsgId) when is_binary(MsgId) -> rabbit_msg_store:contains(MsgId, C); (#basic_message{is_persistent = Persistent}) -> @@ -516,7 +520,8 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, false -> {undefined, fun(_MsgId) -> false end} end, TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, - undefined, AsyncCallback), + undefined, AsyncCallback, + VHost), {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, @@ -1188,14 +1193,17 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> end), Res. -msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) -> +msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) -> msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun, - Callback). + Callback, VHost). -msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> +msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) -> CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), - rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun, - fun () -> Callback(?MODULE, CloseFDsFun) end). + rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, MsgOnDiskFun, + fun () -> + Callback(?MODULE, CloseFDsFun) + end, + VHost). msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( |
