summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2016-02-16 10:29:26 +0300
committerMichael Klishin <michael@clojurewerkz.org>2016-02-16 10:29:26 +0300
commit113bc016ac222151b11e1e110bdbca10d23d56b6 (patch)
tree8ee393450a1a139c6770a57f13aafcd1c4d8c729
parentfbb49b43fd6a3472fffb6ad7f0ecea94a4727926 (diff)
parent25a2d83fbd2952f5cb0e9f8840d3379f11ee402c (diff)
downloadrabbitmq-server-git-113bc016ac222151b11e1e110bdbca10d23d56b6.tar.gz
Merge branch 'master' into rabbitmq-server-550
-rw-r--r--.gitignore2
-rw-r--r--scripts/rabbitmq-env8
-rw-r--r--scripts/rabbitmq-script-wrapper7
-rw-r--r--src/rabbit_alarm.erl2
-rw-r--r--src/rabbit_mirror_queue_sync.erl59
5 files changed, 62 insertions, 16 deletions
diff --git a/.gitignore b/.gitignore
index 4a613c6d54..9e0bfad27d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -42,3 +42,5 @@ rabbitmq-server-*.zip
# Tracing tools
*-ttb
*.ti
+
+PACKAGES/*
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index ad06839bfc..9a2b7acc3c 100644
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -103,15 +103,11 @@ fi
##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
-SED_OPT="-E"
-if [ $(uname -s) = "Linux" ]; then
- SED_OPT="-r"
-fi
-
rmq_normalize_path() {
local path=$1
- echo "$path" | sed $SED_OPT -e 's,//+,/,g' -e 's,(.)/$,\1,'
+ # Remove redundant slashes and strip a trailing slash
+ echo "$path" | sed -e 's#/\{2,\}#/#g' -e 's#/$##'
}
rmq_normalize_path_var() {
diff --git a/scripts/rabbitmq-script-wrapper b/scripts/rabbitmq-script-wrapper
index ed4c276e53..9623f01709 100644
--- a/scripts/rabbitmq-script-wrapper
+++ b/scripts/rabbitmq-script-wrapper
@@ -15,14 +15,9 @@
## Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
##
-SED_OPT="-E"
-if [ $(uname -s) = "Linux" ]; then
- SED_OPT="-r"
-fi
-
for arg in "$@" ; do
# Wrap each arg in single quotes and wrap single quotes in double quotes, so that they're passed through cleanly.
- arg=`printf %s "$arg" | sed $SED_OPT -e "s/'/'\"'\"'/g"`
+ arg=`printf %s "$arg" | sed -e "s#'#'\"'\"'#g"`
CMDLINE="${CMDLINE} '${arg}'"
done
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index c11b6e4383..30743ea243 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -50,7 +50,7 @@
alarms :: [alarm()]}).
-type(local_alarm() :: 'file_descriptor_limit').
--type(resource_alarm_source() :: 'disk' | 'node').
+-type(resource_alarm_source() :: 'disk' | 'memory').
-type(resource_alarm() :: {resource_limit, resource_alarm_source(), node()}).
-type(alarm() :: local_alarm() | resource_alarm()).
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 2d8bdfa860..a97a9b50c8 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([master_prepare/4, master_go/8, slave/7]).
+-export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
@@ -198,7 +198,7 @@ syncer(Ref, Log, MPid, SPids) ->
[] -> Log("all slaves already synced", []);
SPids1 -> MPid ! {ready, self()},
Log("mirrors ~p to sync", [[node(SPid) || SPid <- SPids1]]),
- syncer_loop(Ref, MPid, SPids1)
+ syncer_check_resources(Ref, MPid, SPids1)
end.
await_slaves(Ref, SPids) ->
@@ -217,12 +217,43 @@ await_slaves(Ref, SPids) ->
%% 'sync_start' and so will not reply. We need to act as though they are
%% down.
+syncer_check_resources(Ref, MPid, SPids) ->
+ rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
+ %% Before we ask the master node to send the first batch of messages
+ %% over here, we check if one node is already short on memory. If
+ %% that's the case, we wait for the alarm to be cleared before
+ %% starting the syncer loop.
+ AlarmedNodes = lists:any(
+ fun
+ ({{resource_limit, memory, _}, _}) -> true;
+ ({_, _}) -> false
+ end, rabbit_alarm:get_alarms()),
+ if
+ not AlarmedNodes ->
+ MPid ! {next, Ref},
+ syncer_loop(Ref, MPid, SPids);
+ true ->
+ case wait_for_resources(Ref, SPids) of
+ cancel -> ok;
+ SPids1 -> MPid ! {next, Ref},
+ syncer_loop(Ref, MPid, SPids1)
+ end
+ end.
+
syncer_loop(Ref, MPid, SPids) ->
- MPid ! {next, Ref},
receive
+ {conserve_resources, memory, true} ->
+ case wait_for_resources(Ref, SPids) of
+ cancel -> ok;
+ SPids1 -> syncer_loop(Ref, MPid, SPids1)
+ end;
+ {conserve_resources, _, _} ->
+ %% Ignore other alerts.
+ syncer_loop(Ref, MPid, SPids);
{msgs, Ref, Msgs} ->
SPids1 = wait_for_credit(SPids),
broadcast(SPids1, {sync_msgs, Ref, Msgs}),
+ MPid ! {next, Ref},
syncer_loop(Ref, MPid, SPids1);
{cancel, Ref} ->
%% We don't tell the slaves we will die - so when we do
@@ -239,6 +270,10 @@ broadcast(SPids, Msg) ->
SPid ! Msg
end || SPid <- SPids].
+conserve_resources(Pid, Source, {_, Conserve, _}) ->
+ Pid ! {conserve_resources, Source, Conserve},
+ ok.
+
wait_for_credit(SPids) ->
case credit_flow:blocked() of
true -> receive
@@ -252,6 +287,24 @@ wait_for_credit(SPids) ->
false -> SPids
end.
+wait_for_resources(Ref, SPids) ->
+ receive
+ {conserve_resources, memory, false} ->
+ SPids;
+ {conserve_resources, _, _} ->
+ %% Ignore other alerts.
+ wait_for_resources(Ref, SPids);
+ {cancel, Ref} ->
+ %% We don't tell the slaves we will die - so when we do
+ %% they interpret that as a failure, which is what we
+ %% want.
+ cancel;
+ {'DOWN', _, process, SPid, _} ->
+ credit_flow:peer_down(SPid),
+ SPids1 = wait_for_credit(lists:delete(SPid, SPids)),
+ wait_for_resources(Ref, SPids1)
+ end.
+
%% Syncer
%% ---------------------------------------------------------------------------
%% Slave