summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl23
-rw-r--r--src/rabbit_queue_index.erl39
2 files changed, 39 insertions, 23 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e04ba8f139..fdcc929bd7 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -26,6 +26,8 @@
-export([set_maximum_since_use/2, combine_files/3,
delete_file/2]). %% internal
+-export([scan_file_for_valid_messages/1]). %% salvage tool
+
-export([transform_dir/3, force_recovery/2]). %% upgrade
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -1401,12 +1403,15 @@ should_mask_action(CRef, MsgId,
%% file helper functions
%%----------------------------------------------------------------------------
-open_file(Dir, FileName, Mode) ->
+open_file(File, Mode) ->
file_handle_cache:open_with_absolute_path(
- form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
+ File, ?BINARY_MODE ++ Mode,
[{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE},
{read_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
+open_file(Dir, FileName, Mode) ->
+ open_file(form_filename(Dir, FileName), Mode).
+
close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) ->
CState #client_msstate { file_handle_cache = close_handle(Key, FHC) };
@@ -1696,18 +1701,22 @@ recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
ok = file_handle_cache:delete(TmpHdl),
ok.
-scan_file_for_valid_messages(Dir, FileName) ->
- case open_file(Dir, FileName, ?READ_MODE) of
+scan_file_for_valid_messages(File) ->
+ case open_file(File, ?READ_MODE) of
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
- Hdl, filelib:file_size(
- form_filename(Dir, FileName)),
+ Hdl, filelib:file_size(File),
fun scan_fun/2, []),
ok = file_handle_cache:close(Hdl),
Valid;
{error, enoent} -> {ok, [], 0};
- {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
+ {error, Reason} -> {error, {unable_to_scan_file,
+ filename:basename(File),
+ Reason}}
end.
+scan_file_for_valid_messages(Dir, FileName) ->
+ scan_file_for_valid_messages(form_filename(Dir, FileName)).
+
scan_fun({MsgId, TotalSize, Offset, _Msg}, Acc) ->
[{MsgId, TotalSize, Offset} | Acc].
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index bd959e553a..c4cc62ac58 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -23,7 +23,7 @@
read/3, next_segment_boundary/1, bounds/1, start/2, stop/1]).
-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
--export([scan_queue_segments/3]).
+-export([scan_queue_segments/3, scan_queue_segments/4]).
%% Migrates from global to per-vhost message stores
-export([move_to_per_vhost_stores/1,
@@ -264,8 +264,9 @@
-spec erase(rabbit_amqqueue:name()) -> 'ok'.
-erase(Name) ->
- #qistate { dir = Dir } = blank_state(Name),
+erase(#resource{ virtual_host = VHost } = Name) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ #qistate { dir = Dir } = blank_state(VHostDir, Name),
erase_index_dir(Dir).
%% used during variable queue purge when there are no pending acks
@@ -287,8 +288,9 @@ reset_state(#qistate{ queue_name = Name,
-spec init(rabbit_amqqueue:name(),
on_sync_fun(), on_sync_fun()) -> qistate().
-init(Name, OnSyncFun, OnSyncMsgFun) ->
- State = #qistate { dir = Dir } = blank_state(Name),
+init(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ State = #qistate { dir = Dir } = blank_state(VHostDir, Name),
false = rabbit_file:is_file(Dir), %% is_file == is file or dir
State#qistate{on_sync = OnSyncFun,
on_sync_msg = OnSyncMsgFun}.
@@ -299,9 +301,10 @@ init(Name, OnSyncFun, OnSyncMsgFun) ->
{'undefined' | non_neg_integer(),
'undefined' | non_neg_integer(), qistate()}.
-recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun,
- OnSyncFun, OnSyncMsgFun) ->
- State = blank_state(Name),
+recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
+ ContainsCheckFun, OnSyncFun, OnSyncMsgFun) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ State = blank_state(VHostDir, Name),
State1 = State #qistate{on_sync = OnSyncFun,
on_sync_msg = OnSyncMsgFun},
CleanShutdown = Terms /= non_clean_shutdown,
@@ -558,17 +561,16 @@ erase_index_dir(Dir) ->
false -> ok
end.
-blank_state(QueueName) ->
- Dir = queue_dir(QueueName),
+blank_state(VHostDir, QueueName) ->
+ Dir = queue_dir(VHostDir, QueueName),
blank_state_name_dir_funs(QueueName,
Dir,
fun (_) -> ok end,
fun (_) -> ok end).
-queue_dir(#resource{ virtual_host = VHost } = QueueName) ->
+queue_dir(VHostDir, QueueName) ->
%% Queue directory is
%% {node_database_dir}/msg_stores/vhosts/{vhost}/queues/{queue}
- VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
QueueDir = queue_name_to_dir_name(QueueName),
filename:join([VHostDir, "queues", QueueDir]).
@@ -734,9 +736,13 @@ queue_index_walker_reader(QueueName, Gatherer) ->
end, ok, QueueName),
ok = gatherer:finish(Gatherer).
-scan_queue_segments(Fun, Acc, QueueName) ->
+scan_queue_segments(Fun, Acc, #resource{ virtual_host = VHost } = QueueName) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ scan_queue_segments(Fun, Acc, VHostDir, QueueName).
+
+scan_queue_segments(Fun, Acc, VHostDir, QueueName) ->
State = #qistate { segments = Segments, dir = Dir } =
- recover_journal(blank_state(QueueName)),
+ recover_journal(blank_state(VHostDir, QueueName)),
Result = lists:foldr(
fun (Seg, AccN) ->
segment_entries_foldr(
@@ -1468,10 +1474,11 @@ drive_transform_fun(Fun, Hdl, Contents) ->
drive_transform_fun(Fun, Hdl, Contents1)
end.
-move_to_per_vhost_stores(#resource{} = QueueName) ->
+move_to_per_vhost_stores(#resource{virtual_host = VHost} = QueueName) ->
OldQueueDir = filename:join([queues_base_dir(), "queues",
queue_name_to_dir_name_legacy(QueueName)]),
- NewQueueDir = queue_dir(QueueName),
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ NewQueueDir = queue_dir(VHostDir, QueueName),
rabbit_log_upgrade:info("About to migrate queue directory '~s' to '~s'",
[OldQueueDir, NewQueueDir]),
case rabbit_file:is_dir(OldQueueDir) of