diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2016-10-19 11:16:34 +0100 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2016-10-20 14:49:52 +0100 |
| commit | e6c76fd377c92cd9c4e1cdd6ab6ed6da785a9271 (patch) | |
| tree | 52b8bee34ab6f0f2dbe40910697d3db030518f7b | |
| parent | fdd7b871df020cd138623bd8bffab7077440cbd5 (diff) | |
| download | rabbitmq-server-git-e6c76fd377c92cd9c4e1cdd6ab6ed6da785a9271.tar.gz | |
Queue index per vhost location
| -rw-r--r-- | src/rabbit_msg_store_vhost_sup.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 6 |
3 files changed, 24 insertions, 23 deletions
diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl index a33ffed560..834b9cfb04 100644 --- a/src/rabbit_msg_store_vhost_sup.erl +++ b/src/rabbit_msg_store_vhost_sup.erl @@ -42,19 +42,16 @@ maybe_start_vhost(Server, VHost) -> VHostName. vhost_store_name(Name, VHost) -> - VhostEncoded = encode_vhost_name(VHost), + VhostEncoded = rabbit_vhost:dir(VHost), binary_to_atom(<<(atom_to_binary(Name, utf8))/binary, "_", VhostEncoded/binary>>, utf8). vhost_store_dir(VHost) -> Dir = rabbit_mnesia:dir(), - VhostEncoded = encode_vhost_name(VHost), + VhostEncoded = rabbit_vhost:dir(VHost), binary_to_list(filename:join([Dir, VhostEncoded])). -encode_vhost_name(VHost) -> - base64:encode(VHost). - successfully_recovered_state(Name, VHost) -> VHostName = vhost_store_name(Name, VHost), rabbit_msg_store:successfully_recovered_state(VHostName). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 3e9d92f5c1..a835bd4779 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -476,11 +476,10 @@ start(DurableQueueNames) -> end, {[], sets:new()}, DurableQueueNames), %% Any queue directory we've not been asked to recover is considered garbage - QueuesDir = queues_dir(), rabbit_file:recursive_delete( - [filename:join(QueuesDir, DirName) || - DirName <- all_queue_directory_names(QueuesDir), - not sets:is_element(DirName, DurableDirectories)]), + [DirName || + DirName <- all_queue_directory_names(), + not sets:is_element(filename:basename(DirName), DurableDirectories)]), rabbit_recovery_terms:clear(), @@ -491,12 +490,9 @@ start(DurableQueueNames) -> stop() -> rabbit_recovery_terms:stop(). -all_queue_directory_names(Dir) -> - case rabbit_file:list_dir(Dir) of - {ok, Entries} -> [E || E <- Entries, - rabbit_file:is_dir(filename:join(Dir, E))]; - {error, enoent} -> [] - end. +all_queue_directory_names() -> + QueuesBaseDir = queues_base_dir(), + filelib:wildcard(filename:join([QueuesBaseDir, "*", "queues", "*"])). %%---------------------------------------------------------------------------- %% startup and shutdown @@ -509,14 +505,18 @@ erase_index_dir(Dir) -> end. blank_state(QueueName) -> - blank_state_dir( - filename:join(queues_dir(), queue_name_to_dir_name(QueueName))). + blank_state_dir(queue_dir(QueueName)). blank_state_dir(Dir) -> blank_state_dir_funs(Dir, fun (_) -> ok end, fun (_) -> ok end). +queue_dir(#resource{ virtual_host = VHost } = QueueName) -> + %% Queue directory is rabbit_mnesia_dir/:vhost/queues/:queue_id + filename:join([queues_base_dir(), rabbit_vhost:dir(VHost), + "queues", queue_name_to_dir_name(QueueName)]). + blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) -> {ok, MaxJournal} = application:get_env(rabbit, queue_index_max_journal_entries), @@ -630,8 +630,8 @@ queue_name_to_dir_name(Name = #resource { kind = queue }) -> <<Num:128>> = erlang:md5(term_to_binary(Name)), rabbit_misc:format("~.36B", [Num]). -queues_dir() -> - filename:join(rabbit_mnesia:dir(), "queues"). +queues_base_dir() -> + rabbit_mnesia:dir(). %%---------------------------------------------------------------------------- %% msg store startup delta function @@ -1353,15 +1353,13 @@ store_msg_segment(_) -> %%---------------------------------------------------------------------------- foreach_queue_index(Funs) -> - QueuesDir = queues_dir(), - QueueDirNames = all_queue_directory_names(QueuesDir), + QueueDirNames = all_queue_directory_names(), {ok, Gatherer} = gatherer:start_link(), [begin ok = gatherer:fork(Gatherer), ok = worker_pool:submit_async( fun () -> - transform_queue(filename:join(QueuesDir, QueueDirName), - Gatherer, Funs) + transform_queue(QueueDirName, Gatherer, Funs) end) end || QueueDirName <- QueueDirNames], empty = gatherer:out(Gatherer), diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 01f1046fb8..ddcd69049d 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -23,6 +23,7 @@ -export([add/1, delete/1, exists/1, list/0, with/2, assert/1, update/2, set_limits/2, limits_of/1]). -export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]). +-export([dir/1]). -spec add(rabbit_types:vhost()) -> 'ok'. @@ -93,6 +94,8 @@ delete(VHostPath) -> with(VHostPath, fun () -> internal_delete(VHostPath) end)), ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]), [ok = Fun() || Fun <- Funs], + rabbit_file:recursive_delete([filename:join(rabbit_mnesia:dir(), + VHostPath)]), ok. assert_benign(ok) -> ok; @@ -185,3 +188,6 @@ info_all(Ref, AggregatorPid) -> info_all(?INFO_KEYS, Ref, AggregatorPid). info_all(Items, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map( AggregatorPid, Ref, fun(VHost) -> info(VHost, Items) end, list()). + +dir(Vhost) -> + base64:encode(Vhost).
\ No newline at end of file |
