diff options
| author | GitHubPang <61439577+GitHubPang@users.noreply.github.com> | 2020-07-30 17:16:06 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-07-30 17:16:06 +0800 |
| commit | e47c8ed51b1ba925ebabbff86695f09df6c156d0 (patch) | |
| tree | 2759f4f388f6ebe49c25160f575a812ac1573847 /src | |
| parent | 1dc4e9bdb7c01d986f1d39bf50ce2f301c54525a (diff) | |
| parent | 4d6737e4501cb7a563f4ee4e090e1df580ec101d (diff) | |
| download | rabbitmq-server-git-e47c8ed51b1ba925ebabbff86695f09df6c156d0.tar.gz | |
Merge pull request #1 from rabbitmq/master
Merge from source
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_core_ff.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 39 |
5 files changed, 84 insertions, 59 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 8298dfe79d..f58ae1db94 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -684,7 +684,7 @@ status() -> {log_files, log_locations()}, {data_directory, rabbit_mnesia:dir()}, {raft_data_directory, ra_env:data_dir()}], - Totals = case rabbit:is_running() of + Totals = case is_running() of true -> [{virtual_host_count, rabbit_vhost:count()}, {connection_count, @@ -912,26 +912,31 @@ do_run_postlaunch_phase() -> end end, Plugins), + %% Successful boot resets node maintenance state. rabbit_log_prelaunch:info("Resetting node maintenance status"), - %% successful boot resets node maintenance state - rabbit_maintenance:unmark_as_being_drained(), - rabbit_log_prelaunch:debug("Marking ~s as running", [product_name()]), - rabbit_boot_state:set(ready), + _ = rabbit_maintenance:unmark_as_being_drained(), - ok = rabbit_lager:broker_is_started(), - ok = log_broker_started( - rabbit_plugins:strictly_plugins(rabbit_plugins:active())), - %% export definitions after all plugins have been enabled, + %% Export definitions after all plugins have been enabled, %% see rabbitmq/rabbitmq-server#2384 case rabbit_definitions:maybe_load_definitions() of - ok -> ok; + ok -> ok; DefLoadError -> throw(DefLoadError) end, - %% start listeners after all plugins have been enabled, - %% see rabbitmq/rabbitmq-server#2405 - rabbit_log_prelaunch:info("Ready to start client connection listeners"), - ok = rabbit_networking:boot() + %% Start listeners after all plugins have been enabled, + %% see rabbitmq/rabbitmq-server#2405. + rabbit_log_prelaunch:info( + "Ready to start client connection listeners"), + ok = rabbit_networking:boot(), + + %% The node is ready: mark it as such and log it. + %% NOTE: PLEASE DO NOT ADD CRITICAL NODE STARTUP CODE AFTER THIS. + ok = rabbit_lager:broker_is_started(), + ok = log_broker_started( + rabbit_plugins:strictly_plugins(rabbit_plugins:active())), + + rabbit_log_prelaunch:debug("Marking ~s as running", [product_name()]), + rabbit_boot_state:set(ready) catch throw:{error, _} = Error -> rabbit_prelaunch_errors:log_error(Error), diff --git a/src/rabbit_core_ff.erl b/src/rabbit_core_ff.erl index f97170f6ac..63aa9b30f1 100644 --- a/src/rabbit_core_ff.erl +++ b/src/rabbit_core_ff.erl @@ -36,12 +36,11 @@ }}). -rabbit_feature_flag( - {maintenance_mode_status, - #{ - desc => "Maintenance mode status", - stability => stable, - migration_fun => {?MODULE, maintenance_mode_status_migration} - }}). + {maintenance_mode_status, + #{desc => "Maintenance mode status", + stability => stable, + migration_fun => {?MODULE, maintenance_mode_status_migration} + }}). %% ------------------------------------------------------------------- %% Quorum queues. @@ -106,7 +105,6 @@ remove_explicit_default_bindings(FeatureName, Queues) -> || Q <- Queues], ok. - %% ------------------------------------------------------------------- %% Virtual host metadata. %% ------------------------------------------------------------------- @@ -122,19 +120,24 @@ virtual_host_metadata_migration(_FeatureName, _FeatureProps, enable) -> virtual_host_metadata_migration(_FeatureName, _FeatureProps, is_enabled) -> mnesia:table_info(rabbit_vhost, attributes) =:= vhost:fields(vhost_v2). - -%% -%% Maintenance mode -%% +%% ------------------------------------------------------------------- +%% Maintenance mode. +%% ------------------------------------------------------------------- maintenance_mode_status_migration(FeatureName, _FeatureProps, enable) -> TableName = rabbit_maintenance:status_table_name(), - rabbit_log:info("Creating table ~s for feature flag `~s`", [TableName, FeatureName]), + rabbit_log:info( + "Creating table ~s for feature flag `~s`", + [TableName, FeatureName]), try - _ = rabbit_table:create(TableName, rabbit_maintenance:status_table_definition()), + _ = rabbit_table:create( + TableName, + rabbit_maintenance:status_table_definition()), _ = rabbit_table:ensure_table_copy(TableName, node()) catch throw:Reason -> - rabbit_log:error("Failed to create maintenance status table: ~p", [Reason]) + rabbit_log:error( + "Failed to create maintenance status table: ~p", + [Reason]) end; maintenance_mode_status_migration(_FeatureName, _FeatureProps, is_enabled) -> rabbit_table:exists(rabbit_maintenance:status_table_name()). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index a9719c44fa..4851e56248 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -17,6 +17,8 @@ -export([set_maximum_since_use/2, combine_files/3, delete_file/2]). %% internal +-export([scan_file_for_valid_messages/1]). %% salvage tool + -export([transform_dir/3, force_recovery/2]). %% upgrade -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -1392,12 +1394,15 @@ should_mask_action(CRef, MsgId, %% file helper functions %%---------------------------------------------------------------------------- -open_file(Dir, FileName, Mode) -> +open_file(File, Mode) -> file_handle_cache:open_with_absolute_path( - form_filename(Dir, FileName), ?BINARY_MODE ++ Mode, + File, ?BINARY_MODE ++ Mode, [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}, {read_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). +open_file(Dir, FileName, Mode) -> + open_file(form_filename(Dir, FileName), Mode). + close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) -> CState #client_msstate { file_handle_cache = close_handle(Key, FHC) }; @@ -1687,18 +1692,22 @@ recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) -> ok = file_handle_cache:delete(TmpHdl), ok. -scan_file_for_valid_messages(Dir, FileName) -> - case open_file(Dir, FileName, ?READ_MODE) of +scan_file_for_valid_messages(File) -> + case open_file(File, ?READ_MODE) of {ok, Hdl} -> Valid = rabbit_msg_file:scan( - Hdl, filelib:file_size( - form_filename(Dir, FileName)), + Hdl, filelib:file_size(File), fun scan_fun/2, []), ok = file_handle_cache:close(Hdl), Valid; {error, enoent} -> {ok, [], 0}; - {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}} + {error, Reason} -> {error, {unable_to_scan_file, + filename:basename(File), + Reason}} end. +scan_file_for_valid_messages(Dir, FileName) -> + scan_file_for_valid_messages(form_filename(Dir, FileName)). + scan_fun({MsgId, TotalSize, Offset, _Msg}, Acc) -> [{MsgId, TotalSize, Offset} | Acc]. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 966601d222..63760cd2e3 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -67,12 +67,7 @@ -type protocol() :: atom(). -type label() :: string(). -%% @todo Remove once Dialyzer only runs on Erlang/OTP 21.3 or above. --dialyzer({nowarn_function, boot/0}). --dialyzer({nowarn_function, boot_listeners/3}). --dialyzer({nowarn_function, record_distribution_listener/0}). - --spec boot() -> 'ok'. +-spec boot() -> 'ok' | no_return(). boot() -> ok = record_distribution_listener(), @@ -338,10 +333,16 @@ tcp_listener_stopped(Protocol, Opts, IPAddress, Port) -> port = Port, opts = Opts}). +-spec record_distribution_listener() -> ok | no_return(). + record_distribution_listener() -> {Name, Host} = rabbit_nodes:parts(node()), - {port, Port, _Version} = erl_epmd:port_please(Name, Host), - tcp_listener_started(clustering, [], {0,0,0,0,0,0,0,0}, Port). + case erl_epmd:port_please(Name, Host, infinity) of + {port, Port, _Version} -> + tcp_listener_started(clustering, [], {0,0,0,0,0,0,0,0}, Port); + noport -> + throw({error, no_epmd_port}) + end. -spec active_listeners() -> [rabbit_types:listener()]. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index ece2d26d10..faab4380b5 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -14,7 +14,7 @@ read/3, next_segment_boundary/1, bounds/1, start/2, stop/1]). -export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]). --export([scan_queue_segments/3]). +-export([scan_queue_segments/3, scan_queue_segments/4]). %% Migrates from global to per-vhost message stores -export([move_to_per_vhost_stores/1, @@ -255,8 +255,9 @@ -spec erase(rabbit_amqqueue:name()) -> 'ok'. -erase(Name) -> - #qistate { dir = Dir } = blank_state(Name), +erase(#resource{ virtual_host = VHost } = Name) -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + #qistate { dir = Dir } = blank_state(VHostDir, Name), erase_index_dir(Dir). %% used during variable queue purge when there are no pending acks @@ -278,8 +279,9 @@ reset_state(#qistate{ queue_name = Name, -spec init(rabbit_amqqueue:name(), on_sync_fun(), on_sync_fun()) -> qistate(). -init(Name, OnSyncFun, OnSyncMsgFun) -> - State = #qistate { dir = Dir } = blank_state(Name), +init(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + State = #qistate { dir = Dir } = blank_state(VHostDir, Name), false = rabbit_file:is_file(Dir), %% is_file == is file or dir State#qistate{on_sync = OnSyncFun, on_sync_msg = OnSyncMsgFun}. @@ -290,9 +292,10 @@ init(Name, OnSyncFun, OnSyncMsgFun) -> {'undefined' | non_neg_integer(), 'undefined' | non_neg_integer(), qistate()}. -recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, - OnSyncFun, OnSyncMsgFun) -> - State = blank_state(Name), +recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered, + ContainsCheckFun, OnSyncFun, OnSyncMsgFun) -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + State = blank_state(VHostDir, Name), State1 = State #qistate{on_sync = OnSyncFun, on_sync_msg = OnSyncMsgFun}, CleanShutdown = Terms /= non_clean_shutdown, @@ -549,17 +552,16 @@ erase_index_dir(Dir) -> false -> ok end. -blank_state(QueueName) -> - Dir = queue_dir(QueueName), +blank_state(VHostDir, QueueName) -> + Dir = queue_dir(VHostDir, QueueName), blank_state_name_dir_funs(QueueName, Dir, fun (_) -> ok end, fun (_) -> ok end). -queue_dir(#resource{ virtual_host = VHost } = QueueName) -> +queue_dir(VHostDir, QueueName) -> %% Queue directory is %% {node_database_dir}/msg_stores/vhosts/{vhost}/queues/{queue} - VHostDir = rabbit_vhost:msg_store_dir_path(VHost), QueueDir = queue_name_to_dir_name(QueueName), filename:join([VHostDir, "queues", QueueDir]). @@ -725,9 +727,13 @@ queue_index_walker_reader(QueueName, Gatherer) -> end, ok, QueueName), ok = gatherer:finish(Gatherer). -scan_queue_segments(Fun, Acc, QueueName) -> +scan_queue_segments(Fun, Acc, #resource{ virtual_host = VHost } = QueueName) -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + scan_queue_segments(Fun, Acc, VHostDir, QueueName). + +scan_queue_segments(Fun, Acc, VHostDir, QueueName) -> State = #qistate { segments = Segments, dir = Dir } = - recover_journal(blank_state(QueueName)), + recover_journal(blank_state(VHostDir, QueueName)), Result = lists:foldr( fun (Seg, AccN) -> segment_entries_foldr( @@ -1459,10 +1465,11 @@ drive_transform_fun(Fun, Hdl, Contents) -> drive_transform_fun(Fun, Hdl, Contents1) end. -move_to_per_vhost_stores(#resource{} = QueueName) -> +move_to_per_vhost_stores(#resource{virtual_host = VHost} = QueueName) -> OldQueueDir = filename:join([queues_base_dir(), "queues", queue_name_to_dir_name_legacy(QueueName)]), - NewQueueDir = queue_dir(QueueName), + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + NewQueueDir = queue_dir(VHostDir, QueueName), rabbit_log_upgrade:info("About to migrate queue directory '~s' to '~s'", [OldQueueDir, NewQueueDir]), case rabbit_file:is_dir(OldQueueDir) of |
