summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl33
-rw-r--r--src/rabbit_core_ff.erl31
-rw-r--r--src/rabbit_msg_store.erl23
-rw-r--r--src/rabbit_networking.erl17
-rw-r--r--src/rabbit_queue_index.erl39
5 files changed, 84 insertions, 59 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 8298dfe79d..f58ae1db94 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -684,7 +684,7 @@ status() ->
{log_files, log_locations()},
{data_directory, rabbit_mnesia:dir()},
{raft_data_directory, ra_env:data_dir()}],
- Totals = case rabbit:is_running() of
+ Totals = case is_running() of
true ->
[{virtual_host_count, rabbit_vhost:count()},
{connection_count,
@@ -912,26 +912,31 @@ do_run_postlaunch_phase() ->
end
end, Plugins),
+ %% Successful boot resets node maintenance state.
rabbit_log_prelaunch:info("Resetting node maintenance status"),
- %% successful boot resets node maintenance state
- rabbit_maintenance:unmark_as_being_drained(),
- rabbit_log_prelaunch:debug("Marking ~s as running", [product_name()]),
- rabbit_boot_state:set(ready),
+ _ = rabbit_maintenance:unmark_as_being_drained(),
- ok = rabbit_lager:broker_is_started(),
- ok = log_broker_started(
- rabbit_plugins:strictly_plugins(rabbit_plugins:active())),
- %% export definitions after all plugins have been enabled,
+ %% Export definitions after all plugins have been enabled,
%% see rabbitmq/rabbitmq-server#2384
case rabbit_definitions:maybe_load_definitions() of
- ok -> ok;
+ ok -> ok;
DefLoadError -> throw(DefLoadError)
end,
- %% start listeners after all plugins have been enabled,
- %% see rabbitmq/rabbitmq-server#2405
- rabbit_log_prelaunch:info("Ready to start client connection listeners"),
- ok = rabbit_networking:boot()
+ %% Start listeners after all plugins have been enabled,
+ %% see rabbitmq/rabbitmq-server#2405.
+ rabbit_log_prelaunch:info(
+ "Ready to start client connection listeners"),
+ ok = rabbit_networking:boot(),
+
+ %% The node is ready: mark it as such and log it.
+ %% NOTE: PLEASE DO NOT ADD CRITICAL NODE STARTUP CODE AFTER THIS.
+ ok = rabbit_lager:broker_is_started(),
+ ok = log_broker_started(
+ rabbit_plugins:strictly_plugins(rabbit_plugins:active())),
+
+ rabbit_log_prelaunch:debug("Marking ~s as running", [product_name()]),
+ rabbit_boot_state:set(ready)
catch
throw:{error, _} = Error ->
rabbit_prelaunch_errors:log_error(Error),
diff --git a/src/rabbit_core_ff.erl b/src/rabbit_core_ff.erl
index f97170f6ac..63aa9b30f1 100644
--- a/src/rabbit_core_ff.erl
+++ b/src/rabbit_core_ff.erl
@@ -36,12 +36,11 @@
}}).
-rabbit_feature_flag(
- {maintenance_mode_status,
- #{
- desc => "Maintenance mode status",
- stability => stable,
- migration_fun => {?MODULE, maintenance_mode_status_migration}
- }}).
+ {maintenance_mode_status,
+ #{desc => "Maintenance mode status",
+ stability => stable,
+ migration_fun => {?MODULE, maintenance_mode_status_migration}
+ }}).
%% -------------------------------------------------------------------
%% Quorum queues.
@@ -106,7 +105,6 @@ remove_explicit_default_bindings(FeatureName, Queues) ->
|| Q <- Queues],
ok.
-
%% -------------------------------------------------------------------
%% Virtual host metadata.
%% -------------------------------------------------------------------
@@ -122,19 +120,24 @@ virtual_host_metadata_migration(_FeatureName, _FeatureProps, enable) ->
virtual_host_metadata_migration(_FeatureName, _FeatureProps, is_enabled) ->
mnesia:table_info(rabbit_vhost, attributes) =:= vhost:fields(vhost_v2).
-
-%%
-%% Maintenance mode
-%%
+%% -------------------------------------------------------------------
+%% Maintenance mode.
+%% -------------------------------------------------------------------
maintenance_mode_status_migration(FeatureName, _FeatureProps, enable) ->
TableName = rabbit_maintenance:status_table_name(),
- rabbit_log:info("Creating table ~s for feature flag `~s`", [TableName, FeatureName]),
+ rabbit_log:info(
+ "Creating table ~s for feature flag `~s`",
+ [TableName, FeatureName]),
try
- _ = rabbit_table:create(TableName, rabbit_maintenance:status_table_definition()),
+ _ = rabbit_table:create(
+ TableName,
+ rabbit_maintenance:status_table_definition()),
_ = rabbit_table:ensure_table_copy(TableName, node())
catch throw:Reason ->
- rabbit_log:error("Failed to create maintenance status table: ~p", [Reason])
+ rabbit_log:error(
+ "Failed to create maintenance status table: ~p",
+ [Reason])
end;
maintenance_mode_status_migration(_FeatureName, _FeatureProps, is_enabled) ->
rabbit_table:exists(rabbit_maintenance:status_table_name()).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index a9719c44fa..4851e56248 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -17,6 +17,8 @@
-export([set_maximum_since_use/2, combine_files/3,
delete_file/2]). %% internal
+-export([scan_file_for_valid_messages/1]). %% salvage tool
+
-export([transform_dir/3, force_recovery/2]). %% upgrade
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -1392,12 +1394,15 @@ should_mask_action(CRef, MsgId,
%% file helper functions
%%----------------------------------------------------------------------------
-open_file(Dir, FileName, Mode) ->
+open_file(File, Mode) ->
file_handle_cache:open_with_absolute_path(
- form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
+ File, ?BINARY_MODE ++ Mode,
[{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE},
{read_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
+open_file(Dir, FileName, Mode) ->
+ open_file(form_filename(Dir, FileName), Mode).
+
close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) ->
CState #client_msstate { file_handle_cache = close_handle(Key, FHC) };
@@ -1687,18 +1692,22 @@ recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
ok = file_handle_cache:delete(TmpHdl),
ok.
-scan_file_for_valid_messages(Dir, FileName) ->
- case open_file(Dir, FileName, ?READ_MODE) of
+scan_file_for_valid_messages(File) ->
+ case open_file(File, ?READ_MODE) of
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
- Hdl, filelib:file_size(
- form_filename(Dir, FileName)),
+ Hdl, filelib:file_size(File),
fun scan_fun/2, []),
ok = file_handle_cache:close(Hdl),
Valid;
{error, enoent} -> {ok, [], 0};
- {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
+ {error, Reason} -> {error, {unable_to_scan_file,
+ filename:basename(File),
+ Reason}}
end.
+scan_file_for_valid_messages(Dir, FileName) ->
+ scan_file_for_valid_messages(form_filename(Dir, FileName)).
+
scan_fun({MsgId, TotalSize, Offset, _Msg}, Acc) ->
[{MsgId, TotalSize, Offset} | Acc].
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 966601d222..63760cd2e3 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -67,12 +67,7 @@
-type protocol() :: atom().
-type label() :: string().
-%% @todo Remove once Dialyzer only runs on Erlang/OTP 21.3 or above.
--dialyzer({nowarn_function, boot/0}).
--dialyzer({nowarn_function, boot_listeners/3}).
--dialyzer({nowarn_function, record_distribution_listener/0}).
-
--spec boot() -> 'ok'.
+-spec boot() -> 'ok' | no_return().
boot() ->
ok = record_distribution_listener(),
@@ -338,10 +333,16 @@ tcp_listener_stopped(Protocol, Opts, IPAddress, Port) ->
port = Port,
opts = Opts}).
+-spec record_distribution_listener() -> ok | no_return().
+
record_distribution_listener() ->
{Name, Host} = rabbit_nodes:parts(node()),
- {port, Port, _Version} = erl_epmd:port_please(Name, Host),
- tcp_listener_started(clustering, [], {0,0,0,0,0,0,0,0}, Port).
+ case erl_epmd:port_please(Name, Host, infinity) of
+ {port, Port, _Version} ->
+ tcp_listener_started(clustering, [], {0,0,0,0,0,0,0,0}, Port);
+ noport ->
+ throw({error, no_epmd_port})
+ end.
-spec active_listeners() -> [rabbit_types:listener()].
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index ece2d26d10..faab4380b5 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -14,7 +14,7 @@
read/3, next_segment_boundary/1, bounds/1, start/2, stop/1]).
-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
--export([scan_queue_segments/3]).
+-export([scan_queue_segments/3, scan_queue_segments/4]).
%% Migrates from global to per-vhost message stores
-export([move_to_per_vhost_stores/1,
@@ -255,8 +255,9 @@
-spec erase(rabbit_amqqueue:name()) -> 'ok'.
-erase(Name) ->
- #qistate { dir = Dir } = blank_state(Name),
+erase(#resource{ virtual_host = VHost } = Name) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ #qistate { dir = Dir } = blank_state(VHostDir, Name),
erase_index_dir(Dir).
%% used during variable queue purge when there are no pending acks
@@ -278,8 +279,9 @@ reset_state(#qistate{ queue_name = Name,
-spec init(rabbit_amqqueue:name(),
on_sync_fun(), on_sync_fun()) -> qistate().
-init(Name, OnSyncFun, OnSyncMsgFun) ->
- State = #qistate { dir = Dir } = blank_state(Name),
+init(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ State = #qistate { dir = Dir } = blank_state(VHostDir, Name),
false = rabbit_file:is_file(Dir), %% is_file == is file or dir
State#qistate{on_sync = OnSyncFun,
on_sync_msg = OnSyncMsgFun}.
@@ -290,9 +292,10 @@ init(Name, OnSyncFun, OnSyncMsgFun) ->
{'undefined' | non_neg_integer(),
'undefined' | non_neg_integer(), qistate()}.
-recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun,
- OnSyncFun, OnSyncMsgFun) ->
- State = blank_state(Name),
+recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
+ ContainsCheckFun, OnSyncFun, OnSyncMsgFun) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ State = blank_state(VHostDir, Name),
State1 = State #qistate{on_sync = OnSyncFun,
on_sync_msg = OnSyncMsgFun},
CleanShutdown = Terms /= non_clean_shutdown,
@@ -549,17 +552,16 @@ erase_index_dir(Dir) ->
false -> ok
end.
-blank_state(QueueName) ->
- Dir = queue_dir(QueueName),
+blank_state(VHostDir, QueueName) ->
+ Dir = queue_dir(VHostDir, QueueName),
blank_state_name_dir_funs(QueueName,
Dir,
fun (_) -> ok end,
fun (_) -> ok end).
-queue_dir(#resource{ virtual_host = VHost } = QueueName) ->
+queue_dir(VHostDir, QueueName) ->
%% Queue directory is
%% {node_database_dir}/msg_stores/vhosts/{vhost}/queues/{queue}
- VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
QueueDir = queue_name_to_dir_name(QueueName),
filename:join([VHostDir, "queues", QueueDir]).
@@ -725,9 +727,13 @@ queue_index_walker_reader(QueueName, Gatherer) ->
end, ok, QueueName),
ok = gatherer:finish(Gatherer).
-scan_queue_segments(Fun, Acc, QueueName) ->
+scan_queue_segments(Fun, Acc, #resource{ virtual_host = VHost } = QueueName) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ scan_queue_segments(Fun, Acc, VHostDir, QueueName).
+
+scan_queue_segments(Fun, Acc, VHostDir, QueueName) ->
State = #qistate { segments = Segments, dir = Dir } =
- recover_journal(blank_state(QueueName)),
+ recover_journal(blank_state(VHostDir, QueueName)),
Result = lists:foldr(
fun (Seg, AccN) ->
segment_entries_foldr(
@@ -1459,10 +1465,11 @@ drive_transform_fun(Fun, Hdl, Contents) ->
drive_transform_fun(Fun, Hdl, Contents1)
end.
-move_to_per_vhost_stores(#resource{} = QueueName) ->
+move_to_per_vhost_stores(#resource{virtual_host = VHost} = QueueName) ->
OldQueueDir = filename:join([queues_base_dir(), "queues",
queue_name_to_dir_name_legacy(QueueName)]),
- NewQueueDir = queue_dir(QueueName),
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ NewQueueDir = queue_dir(VHostDir, QueueName),
rabbit_log_upgrade:info("About to migrate queue directory '~s' to '~s'",
[OldQueueDir, NewQueueDir]),
case rabbit_file:is_dir(OldQueueDir) of