diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2017-10-27 19:39:18 +0100 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2017-10-27 19:39:18 +0100 |
| commit | 32c48cd7636885f27c24c60e648fd911ff8acd31 (patch) | |
| tree | 03af2de8d0a4273413f48acaab261a5c8a343522 | |
| parent | 21879a91b19b4c959d7b66d73a31b638cb4f4dcf (diff) | |
| parent | 7da27f3d3b80beddc1ac503c388df78972153ea0 (diff) | |
| download | rabbitmq-server-git-32c48cd7636885f27c24c60e648fd911ff8acd31.tar.gz | |
Merge branch 'stable'
| -rw-r--r-- | src/rabbit.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 18 | ||||
| -rw-r--r-- | test/clustering_management_SUITE.erl | 6 |
3 files changed, 23 insertions, 9 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index cc1e0e08c4..0d0ff2f9fc 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -23,6 +23,7 @@ status/0, is_running/0, alarms/0, is_running/1, environment/0, rotate_logs/0, force_event_refresh/1, start_fhc/0]). + -export([start/2, stop/1, prep_stop/1]). -export([start_apps/1, start_apps/2, stop_apps/1]). -export([log_locations/0, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent @@ -230,6 +231,7 @@ %%---------------------------------------------------------------------------- +-type restart_type() :: 'permanent' | 'transient' | 'temporary'. %% this really should be an abstract type -type log_location() :: string(). -type param() :: atom(). @@ -267,7 +269,7 @@ -spec recover() -> 'ok'. -spec start_apps([app_name()]) -> 'ok'. -spec start_apps([app_name()], - #{app_name() => permanent|transient|temporary}) -> 'ok'. + #{app_name() => restart_type()}) -> 'ok'. -spec stop_apps([app_name()]) -> 'ok'. %%---------------------------------------------------------------------------- @@ -506,7 +508,7 @@ stop_and_halt() -> start_apps(Apps) -> start_apps(Apps, #{}). -start_apps(Apps, AppModes) -> +start_apps(Apps, RestartTypes) -> app_utils:load_applications(Apps), ConfigEntryDecoder = case application:get_env(rabbit, config_entry_decoder) of @@ -547,7 +549,7 @@ start_apps(Apps, AppModes) -> end, ok = app_utils:start_applications(OrderedApps, handle_app_error(could_not_start), - AppModes). + RestartTypes). %% This function retrieves the correct IoDevice for requesting %% input. The problem with using the default IoDevice is that diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 80e36025d8..bdb7da1de2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -22,6 +22,7 @@ -define(SYNC_INTERVAL, 200). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(CONSUMER_BIAS_RATIO, 2.0). %% i.e. consume 100% faster -export([info_keys/0]). @@ -1013,18 +1014,18 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName, ActingUser) -> %%---------------------------------------------------------------------------- -prioritise_call(Msg, _From, _Len, _State) -> +prioritise_call(Msg, _From, _Len, State) -> case Msg of info -> 9; {info, _Items} -> 9; consumers -> 9; stat -> 7; - {basic_consume, _, _, _, _, _, _, _, _, _, _} -> 1; - {basic_cancel, _, _, _} -> 1; + {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State, 0, 2); + {basic_cancel, _, _, _} -> consumer_bias(State, 0, 2); _ -> 0 end. -prioritise_cast(Msg, _Len, _State) -> +prioritise_cast(Msg, _Len, State) -> case Msg of delete_immediately -> 8; {delete_exclusive, _Pid} -> 8; @@ -1033,7 +1034,7 @@ prioritise_cast(Msg, _Len, _State) -> {run_backing_queue, _Mod, _Fun} -> 6; {ack, _AckTags, _ChPid} -> 4; %% [1] {resume, _ChPid} -> 3; - {notify_sent, _ChPid, _Credit} -> 2; + {notify_sent, _ChPid, _Credit} -> consumer_bias(State, 0, 2); _ -> 0 end. @@ -1049,6 +1050,13 @@ prioritise_cast(Msg, _Len, _State) -> %% credit to self is hard to reason about. Consumers can continue while %% reduce_memory_use is in progress. +consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}, Low, High) -> + case BQ:msg_rates(BQS) of + {0.0, _} -> Low; + {Ingress, Egress} when Egress / Ingress < ?CONSUMER_BIAS_RATIO -> High; + {_, _} -> Low + end. + prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> case Msg of {'DOWN', _, process, DownPid, _} -> 8; diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl index 2a23c4997e..0bc1e4dea4 100644 --- a/test/clustering_management_SUITE.erl +++ b/test/clustering_management_SUITE.erl @@ -532,7 +532,9 @@ erlang_config(Config) -> ok = start_app(Hare), assert_clustered([Rabbit, Hare]), - %% If we use an invalid node name, the node fails to start. + %% If we use an invalid node type, the node fails to start. + %% The Erlang VM has stopped after previous rabbit app failure + ok = rabbit_ct_broker_helpers:start_node(Config, Hare), ok = stop_app(Hare), ok = reset(Hare), ok = rpc:call(Hare, application, set_env, @@ -703,6 +705,8 @@ assert_failure(Fun) -> {error, Reason} -> Reason; {error_string, Reason} -> Reason; {badrpc, {'EXIT', Reason}} -> Reason; + %% Failure to start an app result in node shutdown + {badrpc, nodedown} -> nodedown; {badrpc_multi, Reason, _Nodes} -> Reason; Other -> exit({expected_failure, Other}) end. |
