summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2016-10-19 11:16:34 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2016-10-20 14:49:52 +0100
commite6c76fd377c92cd9c4e1cdd6ab6ed6da785a9271 (patch)
tree52b8bee34ab6f0f2dbe40910697d3db030518f7b
parentfdd7b871df020cd138623bd8bffab7077440cbd5 (diff)
downloadrabbitmq-server-git-e6c76fd377c92cd9c4e1cdd6ab6ed6da785a9271.tar.gz
Queue index per vhost location
-rw-r--r--src/rabbit_msg_store_vhost_sup.erl7
-rw-r--r--src/rabbit_queue_index.erl34
-rw-r--r--src/rabbit_vhost.erl6
3 files changed, 24 insertions, 23 deletions
diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl
index a33ffed560..834b9cfb04 100644
--- a/src/rabbit_msg_store_vhost_sup.erl
+++ b/src/rabbit_msg_store_vhost_sup.erl
@@ -42,19 +42,16 @@ maybe_start_vhost(Server, VHost) ->
VHostName.
vhost_store_name(Name, VHost) ->
- VhostEncoded = encode_vhost_name(VHost),
+ VhostEncoded = rabbit_vhost:dir(VHost),
binary_to_atom(<<(atom_to_binary(Name, utf8))/binary, "_",
VhostEncoded/binary>>,
utf8).
vhost_store_dir(VHost) ->
Dir = rabbit_mnesia:dir(),
- VhostEncoded = encode_vhost_name(VHost),
+ VhostEncoded = rabbit_vhost:dir(VHost),
binary_to_list(filename:join([Dir, VhostEncoded])).
-encode_vhost_name(VHost) ->
- base64:encode(VHost).
-
successfully_recovered_state(Name, VHost) ->
VHostName = vhost_store_name(Name, VHost),
rabbit_msg_store:successfully_recovered_state(VHostName).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 3e9d92f5c1..a835bd4779 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -476,11 +476,10 @@ start(DurableQueueNames) ->
end, {[], sets:new()}, DurableQueueNames),
%% Any queue directory we've not been asked to recover is considered garbage
- QueuesDir = queues_dir(),
rabbit_file:recursive_delete(
- [filename:join(QueuesDir, DirName) ||
- DirName <- all_queue_directory_names(QueuesDir),
- not sets:is_element(DirName, DurableDirectories)]),
+ [DirName ||
+ DirName <- all_queue_directory_names(),
+ not sets:is_element(filename:basename(DirName), DurableDirectories)]),
rabbit_recovery_terms:clear(),
@@ -491,12 +490,9 @@ start(DurableQueueNames) ->
stop() -> rabbit_recovery_terms:stop().
-all_queue_directory_names(Dir) ->
- case rabbit_file:list_dir(Dir) of
- {ok, Entries} -> [E || E <- Entries,
- rabbit_file:is_dir(filename:join(Dir, E))];
- {error, enoent} -> []
- end.
+all_queue_directory_names() ->
+ QueuesBaseDir = queues_base_dir(),
+ filelib:wildcard(filename:join([QueuesBaseDir, "*", "queues", "*"])).
%%----------------------------------------------------------------------------
%% startup and shutdown
@@ -509,14 +505,18 @@ erase_index_dir(Dir) ->
end.
blank_state(QueueName) ->
- blank_state_dir(
- filename:join(queues_dir(), queue_name_to_dir_name(QueueName))).
+ blank_state_dir(queue_dir(QueueName)).
blank_state_dir(Dir) ->
blank_state_dir_funs(Dir,
fun (_) -> ok end,
fun (_) -> ok end).
+queue_dir(#resource{ virtual_host = VHost } = QueueName) ->
+ %% Queue directory is rabbit_mnesia_dir/:vhost/queues/:queue_id
+ filename:join([queues_base_dir(), rabbit_vhost:dir(VHost),
+ "queues", queue_name_to_dir_name(QueueName)]).
+
blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) ->
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
@@ -630,8 +630,8 @@ queue_name_to_dir_name(Name = #resource { kind = queue }) ->
<<Num:128>> = erlang:md5(term_to_binary(Name)),
rabbit_misc:format("~.36B", [Num]).
-queues_dir() ->
- filename:join(rabbit_mnesia:dir(), "queues").
+queues_base_dir() ->
+ rabbit_mnesia:dir().
%%----------------------------------------------------------------------------
%% msg store startup delta function
@@ -1353,15 +1353,13 @@ store_msg_segment(_) ->
%%----------------------------------------------------------------------------
foreach_queue_index(Funs) ->
- QueuesDir = queues_dir(),
- QueueDirNames = all_queue_directory_names(QueuesDir),
+ QueueDirNames = all_queue_directory_names(),
{ok, Gatherer} = gatherer:start_link(),
[begin
ok = gatherer:fork(Gatherer),
ok = worker_pool:submit_async(
fun () ->
- transform_queue(filename:join(QueuesDir, QueueDirName),
- Gatherer, Funs)
+ transform_queue(QueueDirName, Gatherer, Funs)
end)
end || QueueDirName <- QueueDirNames],
empty = gatherer:out(Gatherer),
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 01f1046fb8..ddcd69049d 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -23,6 +23,7 @@
-export([add/1, delete/1, exists/1, list/0, with/2, assert/1, update/2,
set_limits/2, limits_of/1]).
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
+-export([dir/1]).
-spec add(rabbit_types:vhost()) -> 'ok'.
@@ -93,6 +94,8 @@ delete(VHostPath) ->
with(VHostPath, fun () -> internal_delete(VHostPath) end)),
ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]),
[ok = Fun() || Fun <- Funs],
+ rabbit_file:recursive_delete([filename:join(rabbit_mnesia:dir(),
+ VHostPath)]),
ok.
assert_benign(ok) -> ok;
@@ -185,3 +188,6 @@ info_all(Ref, AggregatorPid) -> info_all(?INFO_KEYS, Ref, AggregatorPid).
info_all(Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(
AggregatorPid, Ref, fun(VHost) -> info(VHost, Items) end, list()).
+
+dir(Vhost) ->
+ base64:encode(Vhost). \ No newline at end of file