diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2017-06-07 11:28:46 +0100 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2017-06-13 12:08:30 +0100 |
| commit | 5c69639b5d15556fe3271640fcfba1688d4984f7 (patch) | |
| tree | 5eb298a3b7680cdb52882e3a93847e1d172f93e6 /src | |
| parent | 35f43082a1ab0d3ce49ab8958e980fb81b55f159 (diff) | |
| download | rabbitmq-server-git-5c69639b5d15556fe3271640fcfba1688d4984f7.tar.gz | |
Change directory name generation function for queue indexes and vhosts.
It was a mistake to relate on md5(term_to_binary(..)) to generate
vhost and queue directory name, since term_to_binary format can
change.
Migration functions take care of renaming directories.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 111 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 2 |
2 files changed, 66 insertions, 47 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 123bfaba25..7416ede4b8 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -116,6 +116,7 @@ %% ---- Journal details ---- -define(JOURNAL_FILENAME, "journal.jif"). +-define(QUEUE_NAME_STUB_FILE, ".queue_name"). -define(PUB_PERSIST_JPREFIX, 2#00). -define(PUB_TRANS_JPREFIX, 2#01). @@ -204,7 +205,9 @@ %% optimisation pre_publish_cache, %% optimisation - delivered_cache}). + delivered_cache, + %% queue name resource record + queue_name}). -record(segment, { %% segment ID (an integer) @@ -295,7 +298,8 @@ erase(Name) -> erase_index_dir(Dir). %% used during variable queue purge when there are no pending acks -reset_state(#qistate{ dir = Dir, +reset_state(#qistate{ queue_name = Name, + dir = Dir, on_sync = OnSyncFun, on_sync_msg = OnSyncMsgFun, journal_handle = JournalHdl }) -> @@ -304,7 +308,7 @@ reset_state(#qistate{ dir = Dir, _ -> file_handle_cache:close(JournalHdl) end, ok = erase_index_dir(Dir), - blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun). + blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun). init(Name, OnSyncFun, OnSyncMsgFun) -> State = #qistate { dir = Dir } = blank_state(Name), @@ -520,32 +524,6 @@ start(VHost, DurableQueueNames) -> {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. -read_global_recovery_terms(DurableQueueNames) -> - ok = rabbit_recovery_terms:open_global_table(), - - DurableTerms = - lists:foldl( - fun(QName, RecoveryTerms) -> - DirName = queue_name_to_dir_name(QName), - RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of - {error, _} -> non_clean_shutdown; - {ok, Terms} -> Terms - end, - [RecoveryInfo | RecoveryTerms] - end, [], DurableQueueNames), - - ok = rabbit_recovery_terms:close_global_table(), - %% The backing queue interface requires that the queue recovery terms - %% which come back from start/1 are in the same order as DurableQueueNames - OrderedTerms = lists:reverse(DurableTerms), - {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. - -cleanup_global_recovery_terms() -> - rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]), - rabbit_recovery_terms:delete_global_table(), - ok. - - stop(VHost) -> rabbit_recovery_terms:stop(VHost). all_queue_directory_names(VHost) -> @@ -567,10 +545,9 @@ erase_index_dir(Dir) -> end. blank_state(QueueName) -> - blank_state_dir(queue_dir(QueueName)). - -blank_state_dir(Dir) -> - blank_state_dir_funs(Dir, + Dir = queue_dir(QueueName), + blank_state_name_dir_funs(QueueName, + Dir, fun (_) -> ok end, fun (_) -> ok end). @@ -581,7 +558,20 @@ queue_dir(#resource{ virtual_host = VHost } = QueueName) -> QueueDir = queue_name_to_dir_name(QueueName), filename:join([VHostDir, "queues", QueueDir]). -blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) -> +queue_name_to_dir_name(#resource { kind = queue, + virtual_host = VHost, + name = QName }) -> + <<Num:128>> = erlang:md5(<<"queue", VHost/binary, QName/binary>>), + rabbit_misc:format("~.36B", [Num]). + +queue_name_to_dir_name_legacy(Name = #resource { kind = queue }) -> + <<Num:128>> = erlang:md5(term_to_binary_compat:queue_name_to_binary(Name)), + rabbit_misc:format("~.36B", [Num]). + +queues_base_dir() -> + rabbit_mnesia:dir(). + +blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun) -> {ok, MaxJournal} = application:get_env(rabbit, queue_index_max_journal_entries), #qistate { dir = Dir, @@ -594,7 +584,8 @@ blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) -> unconfirmed = gb_sets:new(), unconfirmed_msg = gb_sets:new(), pre_publish_cache = [], - delivered_cache = [] }. + delivered_cache = [], + queue_name = Name }. init_clean(RecoveredCounts, State) -> %% Load the journal. Since this is a clean recovery this (almost) @@ -690,13 +681,6 @@ recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) -> add_to_journal(RelSeq, del, Segment)), DirtyCount + 2}. -queue_name_to_dir_name(Name = #resource { kind = queue }) -> - <<Num:128>> = erlang:md5(term_to_binary_compat:queue_name_to_binary(Name)), - rabbit_misc:format("~.36B", [Num]). - -queues_base_dir() -> - rabbit_mnesia:dir(). - %%---------------------------------------------------------------------------- %% msg store startup delta function %%---------------------------------------------------------------------------- @@ -890,9 +874,11 @@ append_journal_to_segment(#segment { journal_entries = JEntries, end. get_journal_handle(State = #qistate { journal_handle = undefined, - dir = Dir }) -> + dir = Dir, + queue_name = Name }) -> Path = filename:join(Dir, ?JOURNAL_FILENAME), ok = rabbit_file:ensure_dir(Path), + ok = ensure_queue_name_stub_file(Dir, Name), {ok, Hdl} = file_handle_cache:open_with_absolute_path( Path, ?WRITE_MODE, [{write_buffer, infinity}]), {Hdl, State #qistate { journal_handle = Hdl }}; @@ -1413,7 +1399,8 @@ store_msg_segment(_) -> - +%%---------------------------------------------------------------------------- +%% Migration functions %%---------------------------------------------------------------------------- foreach_queue_index(Funs) -> @@ -1467,18 +1454,50 @@ drive_transform_fun(Fun, Hdl, Contents) -> move_to_per_vhost_stores(#resource{} = QueueName) -> OldQueueDir = filename:join([queues_base_dir(), "queues", - queue_name_to_dir_name(QueueName)]), + queue_name_to_dir_name_legacy(QueueName)]), NewQueueDir = queue_dir(QueueName), case rabbit_file:is_dir(OldQueueDir) of true -> ok = rabbit_file:ensure_dir(NewQueueDir), - ok = rabbit_file:rename(OldQueueDir, NewQueueDir); + ok = rabbit_file:rename(OldQueueDir, NewQueueDir), + ok = ensure_queue_name_stub_file(NewQueueDir, QueueName); false -> rabbit_log:info("Queue index directory not found for queue ~p~n", [QueueName]) end, ok. +ensure_queue_name_stub_file(Dir, #resource{virtual_host = VHost, name = QName}) -> + QueueNameFile = filename:join(Dir, ?QUEUE_NAME_STUB_FILE), + file:write_file(QueueNameFile, <<"VHOST: ", VHost/binary, "\n", + "QUEUE: ", QName/binary, "\n">>). + +read_global_recovery_terms(DurableQueueNames) -> + ok = rabbit_recovery_terms:open_global_table(), + + DurableTerms = + lists:foldl( + fun(QName, RecoveryTerms) -> + DirName = queue_name_to_dir_name_legacy(QName), + RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of + {error, _} -> non_clean_shutdown; + {ok, Terms} -> Terms + end, + [RecoveryInfo | RecoveryTerms] + end, [], DurableQueueNames), + + ok = rabbit_recovery_terms:close_global_table(), + %% The backing queue interface requires that the queue recovery terms + %% which come back from start/1 are in the same order as DurableQueueNames + OrderedTerms = lists:reverse(DurableTerms), + {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. + +cleanup_global_recovery_terms() -> + rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]), + rabbit_recovery_terms:delete_global_table(), + ok. + + update_recovery_term(#resource{virtual_host = VHost} = QueueName, Term) -> Key = queue_name_to_dir_name(QueueName), rabbit_recovery_terms:store(VHost, Key, Term). diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 1d1ea16cca..c712015170 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -213,7 +213,7 @@ set_limits(VHost = #vhost{}, Limits) -> dir(Vhost) -> - <<Num:128>> = erlang:md5(term_to_binary(Vhost)), + <<Num:128>> = erlang:md5(Vhost), rabbit_misc:format("~.36B", [Num]). msg_store_dir_path(VHost) -> |
