summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2017-10-27 19:39:18 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2017-10-27 19:39:18 +0100
commit32c48cd7636885f27c24c60e648fd911ff8acd31 (patch)
tree03af2de8d0a4273413f48acaab261a5c8a343522
parent21879a91b19b4c959d7b66d73a31b638cb4f4dcf (diff)
parent7da27f3d3b80beddc1ac503c388df78972153ea0 (diff)
downloadrabbitmq-server-git-32c48cd7636885f27c24c60e648fd911ff8acd31.tar.gz
Merge branch 'stable'
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl18
-rw-r--r--test/clustering_management_SUITE.erl6
3 files changed, 23 insertions, 9 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index cc1e0e08c4..0d0ff2f9fc 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -23,6 +23,7 @@
status/0, is_running/0, alarms/0,
is_running/1, environment/0, rotate_logs/0, force_event_refresh/1,
start_fhc/0]).
+
-export([start/2, stop/1, prep_stop/1]).
-export([start_apps/1, start_apps/2, stop_apps/1]).
-export([log_locations/0, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent
@@ -230,6 +231,7 @@
%%----------------------------------------------------------------------------
+-type restart_type() :: 'permanent' | 'transient' | 'temporary'.
%% this really should be an abstract type
-type log_location() :: string().
-type param() :: atom().
@@ -267,7 +269,7 @@
-spec recover() -> 'ok'.
-spec start_apps([app_name()]) -> 'ok'.
-spec start_apps([app_name()],
- #{app_name() => permanent|transient|temporary}) -> 'ok'.
+ #{app_name() => restart_type()}) -> 'ok'.
-spec stop_apps([app_name()]) -> 'ok'.
%%----------------------------------------------------------------------------
@@ -506,7 +508,7 @@ stop_and_halt() ->
start_apps(Apps) ->
start_apps(Apps, #{}).
-start_apps(Apps, AppModes) ->
+start_apps(Apps, RestartTypes) ->
app_utils:load_applications(Apps),
ConfigEntryDecoder = case application:get_env(rabbit, config_entry_decoder) of
@@ -547,7 +549,7 @@ start_apps(Apps, AppModes) ->
end,
ok = app_utils:start_applications(OrderedApps,
handle_app_error(could_not_start),
- AppModes).
+ RestartTypes).
%% This function retrieves the correct IoDevice for requesting
%% input. The problem with using the default IoDevice is that
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 80e36025d8..bdb7da1de2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -22,6 +22,7 @@
-define(SYNC_INTERVAL, 200). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(CONSUMER_BIAS_RATIO, 2.0). %% i.e. consume 100% faster
-export([info_keys/0]).
@@ -1013,18 +1014,18 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName, ActingUser) ->
%%----------------------------------------------------------------------------
-prioritise_call(Msg, _From, _Len, _State) ->
+prioritise_call(Msg, _From, _Len, State) ->
case Msg of
info -> 9;
{info, _Items} -> 9;
consumers -> 9;
stat -> 7;
- {basic_consume, _, _, _, _, _, _, _, _, _, _} -> 1;
- {basic_cancel, _, _, _} -> 1;
+ {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State, 0, 2);
+ {basic_cancel, _, _, _} -> consumer_bias(State, 0, 2);
_ -> 0
end.
-prioritise_cast(Msg, _Len, _State) ->
+prioritise_cast(Msg, _Len, State) ->
case Msg of
delete_immediately -> 8;
{delete_exclusive, _Pid} -> 8;
@@ -1033,7 +1034,7 @@ prioritise_cast(Msg, _Len, _State) ->
{run_backing_queue, _Mod, _Fun} -> 6;
{ack, _AckTags, _ChPid} -> 4; %% [1]
{resume, _ChPid} -> 3;
- {notify_sent, _ChPid, _Credit} -> 2;
+ {notify_sent, _ChPid, _Credit} -> consumer_bias(State, 0, 2);
_ -> 0
end.
@@ -1049,6 +1050,13 @@ prioritise_cast(Msg, _Len, _State) ->
%% credit to self is hard to reason about. Consumers can continue while
%% reduce_memory_use is in progress.
+consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}, Low, High) ->
+ case BQ:msg_rates(BQS) of
+ {0.0, _} -> Low;
+ {Ingress, Egress} when Egress / Ingress < ?CONSUMER_BIAS_RATIO -> High;
+ {_, _} -> Low
+ end.
+
prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
case Msg of
{'DOWN', _, process, DownPid, _} -> 8;
diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl
index 2a23c4997e..0bc1e4dea4 100644
--- a/test/clustering_management_SUITE.erl
+++ b/test/clustering_management_SUITE.erl
@@ -532,7 +532,9 @@ erlang_config(Config) ->
ok = start_app(Hare),
assert_clustered([Rabbit, Hare]),
- %% If we use an invalid node name, the node fails to start.
+ %% If we use an invalid node type, the node fails to start.
+ %% The Erlang VM has stopped after previous rabbit app failure
+ ok = rabbit_ct_broker_helpers:start_node(Config, Hare),
ok = stop_app(Hare),
ok = reset(Hare),
ok = rpc:call(Hare, application, set_env,
@@ -703,6 +705,8 @@ assert_failure(Fun) ->
{error, Reason} -> Reason;
{error_string, Reason} -> Reason;
{badrpc, {'EXIT', Reason}} -> Reason;
+ %% Failure to start an app result in node shutdown
+ {badrpc, nodedown} -> nodedown;
{badrpc_multi, Reason, _Nodes} -> Reason;
Other -> exit({expected_failure, Other})
end.