diff options
| -rw-r--r-- | src/rabbit_msg_store.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 39 |
2 files changed, 39 insertions, 23 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e04ba8f139..fdcc929bd7 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -26,6 +26,8 @@ -export([set_maximum_since_use/2, combine_files/3, delete_file/2]). %% internal +-export([scan_file_for_valid_messages/1]). %% salvage tool + -export([transform_dir/3, force_recovery/2]). %% upgrade -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -1401,12 +1403,15 @@ should_mask_action(CRef, MsgId, %% file helper functions %%---------------------------------------------------------------------------- -open_file(Dir, FileName, Mode) -> +open_file(File, Mode) -> file_handle_cache:open_with_absolute_path( - form_filename(Dir, FileName), ?BINARY_MODE ++ Mode, + File, ?BINARY_MODE ++ Mode, [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}, {read_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). +open_file(Dir, FileName, Mode) -> + open_file(form_filename(Dir, FileName), Mode). + close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) -> CState #client_msstate { file_handle_cache = close_handle(Key, FHC) }; @@ -1696,18 +1701,22 @@ recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) -> ok = file_handle_cache:delete(TmpHdl), ok. -scan_file_for_valid_messages(Dir, FileName) -> - case open_file(Dir, FileName, ?READ_MODE) of +scan_file_for_valid_messages(File) -> + case open_file(File, ?READ_MODE) of {ok, Hdl} -> Valid = rabbit_msg_file:scan( - Hdl, filelib:file_size( - form_filename(Dir, FileName)), + Hdl, filelib:file_size(File), fun scan_fun/2, []), ok = file_handle_cache:close(Hdl), Valid; {error, enoent} -> {ok, [], 0}; - {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}} + {error, Reason} -> {error, {unable_to_scan_file, + filename:basename(File), + Reason}} end. +scan_file_for_valid_messages(Dir, FileName) -> + scan_file_for_valid_messages(form_filename(Dir, FileName)). + scan_fun({MsgId, TotalSize, Offset, _Msg}, Acc) -> [{MsgId, TotalSize, Offset} | Acc]. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index bd959e553a..c4cc62ac58 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -23,7 +23,7 @@ read/3, next_segment_boundary/1, bounds/1, start/2, stop/1]). -export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]). --export([scan_queue_segments/3]). +-export([scan_queue_segments/3, scan_queue_segments/4]). %% Migrates from global to per-vhost message stores -export([move_to_per_vhost_stores/1, @@ -264,8 +264,9 @@ -spec erase(rabbit_amqqueue:name()) -> 'ok'. -erase(Name) -> - #qistate { dir = Dir } = blank_state(Name), +erase(#resource{ virtual_host = VHost } = Name) -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + #qistate { dir = Dir } = blank_state(VHostDir, Name), erase_index_dir(Dir). %% used during variable queue purge when there are no pending acks @@ -287,8 +288,9 @@ reset_state(#qistate{ queue_name = Name, -spec init(rabbit_amqqueue:name(), on_sync_fun(), on_sync_fun()) -> qistate(). -init(Name, OnSyncFun, OnSyncMsgFun) -> - State = #qistate { dir = Dir } = blank_state(Name), +init(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + State = #qistate { dir = Dir } = blank_state(VHostDir, Name), false = rabbit_file:is_file(Dir), %% is_file == is file or dir State#qistate{on_sync = OnSyncFun, on_sync_msg = OnSyncMsgFun}. @@ -299,9 +301,10 @@ init(Name, OnSyncFun, OnSyncMsgFun) -> {'undefined' | non_neg_integer(), 'undefined' | non_neg_integer(), qistate()}. -recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, - OnSyncFun, OnSyncMsgFun) -> - State = blank_state(Name), +recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered, + ContainsCheckFun, OnSyncFun, OnSyncMsgFun) -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + State = blank_state(VHostDir, Name), State1 = State #qistate{on_sync = OnSyncFun, on_sync_msg = OnSyncMsgFun}, CleanShutdown = Terms /= non_clean_shutdown, @@ -558,17 +561,16 @@ erase_index_dir(Dir) -> false -> ok end. -blank_state(QueueName) -> - Dir = queue_dir(QueueName), +blank_state(VHostDir, QueueName) -> + Dir = queue_dir(VHostDir, QueueName), blank_state_name_dir_funs(QueueName, Dir, fun (_) -> ok end, fun (_) -> ok end). -queue_dir(#resource{ virtual_host = VHost } = QueueName) -> +queue_dir(VHostDir, QueueName) -> %% Queue directory is %% {node_database_dir}/msg_stores/vhosts/{vhost}/queues/{queue} - VHostDir = rabbit_vhost:msg_store_dir_path(VHost), QueueDir = queue_name_to_dir_name(QueueName), filename:join([VHostDir, "queues", QueueDir]). @@ -734,9 +736,13 @@ queue_index_walker_reader(QueueName, Gatherer) -> end, ok, QueueName), ok = gatherer:finish(Gatherer). -scan_queue_segments(Fun, Acc, QueueName) -> +scan_queue_segments(Fun, Acc, #resource{ virtual_host = VHost } = QueueName) -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + scan_queue_segments(Fun, Acc, VHostDir, QueueName). + +scan_queue_segments(Fun, Acc, VHostDir, QueueName) -> State = #qistate { segments = Segments, dir = Dir } = - recover_journal(blank_state(QueueName)), + recover_journal(blank_state(VHostDir, QueueName)), Result = lists:foldr( fun (Seg, AccN) -> segment_entries_foldr( @@ -1468,10 +1474,11 @@ drive_transform_fun(Fun, Hdl, Contents) -> drive_transform_fun(Fun, Hdl, Contents1) end. -move_to_per_vhost_stores(#resource{} = QueueName) -> +move_to_per_vhost_stores(#resource{virtual_host = VHost} = QueueName) -> OldQueueDir = filename:join([queues_base_dir(), "queues", queue_name_to_dir_name_legacy(QueueName)]), - NewQueueDir = queue_dir(QueueName), + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + NewQueueDir = queue_dir(VHostDir, QueueName), rabbit_log_upgrade:info("About to migrate queue directory '~s' to '~s'", [OldQueueDir, NewQueueDir]), case rabbit_file:is_dir(OldQueueDir) of |
