summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl111
-rw-r--r--src/rabbit_vhost.erl2
2 files changed, 66 insertions, 47 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 123bfaba25..7416ede4b8 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -116,6 +116,7 @@
%% ---- Journal details ----
-define(JOURNAL_FILENAME, "journal.jif").
+-define(QUEUE_NAME_STUB_FILE, ".queue_name").
-define(PUB_PERSIST_JPREFIX, 2#00).
-define(PUB_TRANS_JPREFIX, 2#01).
@@ -204,7 +205,9 @@
%% optimisation
pre_publish_cache,
%% optimisation
- delivered_cache}).
+ delivered_cache,
+ %% queue name resource record
+ queue_name}).
-record(segment, {
%% segment ID (an integer)
@@ -295,7 +298,8 @@ erase(Name) ->
erase_index_dir(Dir).
%% used during variable queue purge when there are no pending acks
-reset_state(#qistate{ dir = Dir,
+reset_state(#qistate{ queue_name = Name,
+ dir = Dir,
on_sync = OnSyncFun,
on_sync_msg = OnSyncMsgFun,
journal_handle = JournalHdl }) ->
@@ -304,7 +308,7 @@ reset_state(#qistate{ dir = Dir,
_ -> file_handle_cache:close(JournalHdl)
end,
ok = erase_index_dir(Dir),
- blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun).
+ blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun).
init(Name, OnSyncFun, OnSyncMsgFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
@@ -520,32 +524,6 @@ start(VHost, DurableQueueNames) ->
{OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
-read_global_recovery_terms(DurableQueueNames) ->
- ok = rabbit_recovery_terms:open_global_table(),
-
- DurableTerms =
- lists:foldl(
- fun(QName, RecoveryTerms) ->
- DirName = queue_name_to_dir_name(QName),
- RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of
- {error, _} -> non_clean_shutdown;
- {ok, Terms} -> Terms
- end,
- [RecoveryInfo | RecoveryTerms]
- end, [], DurableQueueNames),
-
- ok = rabbit_recovery_terms:close_global_table(),
- %% The backing queue interface requires that the queue recovery terms
- %% which come back from start/1 are in the same order as DurableQueueNames
- OrderedTerms = lists:reverse(DurableTerms),
- {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
-
-cleanup_global_recovery_terms() ->
- rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]),
- rabbit_recovery_terms:delete_global_table(),
- ok.
-
-
stop(VHost) -> rabbit_recovery_terms:stop(VHost).
all_queue_directory_names(VHost) ->
@@ -567,10 +545,9 @@ erase_index_dir(Dir) ->
end.
blank_state(QueueName) ->
- blank_state_dir(queue_dir(QueueName)).
-
-blank_state_dir(Dir) ->
- blank_state_dir_funs(Dir,
+ Dir = queue_dir(QueueName),
+ blank_state_name_dir_funs(QueueName,
+ Dir,
fun (_) -> ok end,
fun (_) -> ok end).
@@ -581,7 +558,20 @@ queue_dir(#resource{ virtual_host = VHost } = QueueName) ->
QueueDir = queue_name_to_dir_name(QueueName),
filename:join([VHostDir, "queues", QueueDir]).
-blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) ->
+queue_name_to_dir_name(#resource { kind = queue,
+ virtual_host = VHost,
+ name = QName }) ->
+ <<Num:128>> = erlang:md5(<<"queue", VHost/binary, QName/binary>>),
+ rabbit_misc:format("~.36B", [Num]).
+
+queue_name_to_dir_name_legacy(Name = #resource { kind = queue }) ->
+ <<Num:128>> = erlang:md5(term_to_binary_compat:queue_name_to_binary(Name)),
+ rabbit_misc:format("~.36B", [Num]).
+
+queues_base_dir() ->
+ rabbit_mnesia:dir().
+
+blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun) ->
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
#qistate { dir = Dir,
@@ -594,7 +584,8 @@ blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) ->
unconfirmed = gb_sets:new(),
unconfirmed_msg = gb_sets:new(),
pre_publish_cache = [],
- delivered_cache = [] }.
+ delivered_cache = [],
+ queue_name = Name }.
init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
@@ -690,13 +681,6 @@ recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) ->
add_to_journal(RelSeq, del, Segment)),
DirtyCount + 2}.
-queue_name_to_dir_name(Name = #resource { kind = queue }) ->
- <<Num:128>> = erlang:md5(term_to_binary_compat:queue_name_to_binary(Name)),
- rabbit_misc:format("~.36B", [Num]).
-
-queues_base_dir() ->
- rabbit_mnesia:dir().
-
%%----------------------------------------------------------------------------
%% msg store startup delta function
%%----------------------------------------------------------------------------
@@ -890,9 +874,11 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
end.
get_journal_handle(State = #qistate { journal_handle = undefined,
- dir = Dir }) ->
+ dir = Dir,
+ queue_name = Name }) ->
Path = filename:join(Dir, ?JOURNAL_FILENAME),
ok = rabbit_file:ensure_dir(Path),
+ ok = ensure_queue_name_stub_file(Dir, Name),
{ok, Hdl} = file_handle_cache:open_with_absolute_path(
Path, ?WRITE_MODE, [{write_buffer, infinity}]),
{Hdl, State #qistate { journal_handle = Hdl }};
@@ -1413,7 +1399,8 @@ store_msg_segment(_) ->
-
+%%----------------------------------------------------------------------------
+%% Migration functions
%%----------------------------------------------------------------------------
foreach_queue_index(Funs) ->
@@ -1467,18 +1454,50 @@ drive_transform_fun(Fun, Hdl, Contents) ->
move_to_per_vhost_stores(#resource{} = QueueName) ->
OldQueueDir = filename:join([queues_base_dir(), "queues",
- queue_name_to_dir_name(QueueName)]),
+ queue_name_to_dir_name_legacy(QueueName)]),
NewQueueDir = queue_dir(QueueName),
case rabbit_file:is_dir(OldQueueDir) of
true ->
ok = rabbit_file:ensure_dir(NewQueueDir),
- ok = rabbit_file:rename(OldQueueDir, NewQueueDir);
+ ok = rabbit_file:rename(OldQueueDir, NewQueueDir),
+ ok = ensure_queue_name_stub_file(NewQueueDir, QueueName);
false ->
rabbit_log:info("Queue index directory not found for queue ~p~n",
[QueueName])
end,
ok.
+ensure_queue_name_stub_file(Dir, #resource{virtual_host = VHost, name = QName}) ->
+ QueueNameFile = filename:join(Dir, ?QUEUE_NAME_STUB_FILE),
+ file:write_file(QueueNameFile, <<"VHOST: ", VHost/binary, "\n",
+ "QUEUE: ", QName/binary, "\n">>).
+
+read_global_recovery_terms(DurableQueueNames) ->
+ ok = rabbit_recovery_terms:open_global_table(),
+
+ DurableTerms =
+ lists:foldl(
+ fun(QName, RecoveryTerms) ->
+ DirName = queue_name_to_dir_name_legacy(QName),
+ RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of
+ {error, _} -> non_clean_shutdown;
+ {ok, Terms} -> Terms
+ end,
+ [RecoveryInfo | RecoveryTerms]
+ end, [], DurableQueueNames),
+
+ ok = rabbit_recovery_terms:close_global_table(),
+ %% The backing queue interface requires that the queue recovery terms
+ %% which come back from start/1 are in the same order as DurableQueueNames
+ OrderedTerms = lists:reverse(DurableTerms),
+ {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
+
+cleanup_global_recovery_terms() ->
+ rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]),
+ rabbit_recovery_terms:delete_global_table(),
+ ok.
+
+
update_recovery_term(#resource{virtual_host = VHost} = QueueName, Term) ->
Key = queue_name_to_dir_name(QueueName),
rabbit_recovery_terms:store(VHost, Key, Term).
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 1d1ea16cca..c712015170 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -213,7 +213,7 @@ set_limits(VHost = #vhost{}, Limits) ->
dir(Vhost) ->
- <<Num:128>> = erlang:md5(term_to_binary(Vhost)),
+ <<Num:128>> = erlang:md5(Vhost),
rabbit_misc:format("~.36B", [Num]).
msg_store_dir_path(VHost) ->