diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2016-02-16 10:29:26 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2016-02-16 10:29:26 +0300 |
| commit | 113bc016ac222151b11e1e110bdbca10d23d56b6 (patch) | |
| tree | 8ee393450a1a139c6770a57f13aafcd1c4d8c729 | |
| parent | fbb49b43fd6a3472fffb6ad7f0ecea94a4727926 (diff) | |
| parent | 25a2d83fbd2952f5cb0e9f8840d3379f11ee402c (diff) | |
| download | rabbitmq-server-git-113bc016ac222151b11e1e110bdbca10d23d56b6.tar.gz | |
Merge branch 'master' into rabbitmq-server-550
| -rw-r--r-- | .gitignore | 2 | ||||
| -rw-r--r-- | scripts/rabbitmq-env | 8 | ||||
| -rw-r--r-- | scripts/rabbitmq-script-wrapper | 7 | ||||
| -rw-r--r-- | src/rabbit_alarm.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 59 |
5 files changed, 62 insertions, 16 deletions
diff --git a/.gitignore b/.gitignore index 4a613c6d54..9e0bfad27d 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,5 @@ rabbitmq-server-*.zip # Tracing tools *-ttb *.ti + +PACKAGES/* diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index ad06839bfc..9a2b7acc3c 100644 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -103,15 +103,11 @@ fi ##--- Set environment vars RABBITMQ_<var_name> to defaults if not set -SED_OPT="-E" -if [ $(uname -s) = "Linux" ]; then - SED_OPT="-r" -fi - rmq_normalize_path() { local path=$1 - echo "$path" | sed $SED_OPT -e 's,//+,/,g' -e 's,(.)/$,\1,' + # Remove redundant slashes and strip a trailing slash + echo "$path" | sed -e 's#/\{2,\}#/#g' -e 's#/$##' } rmq_normalize_path_var() { diff --git a/scripts/rabbitmq-script-wrapper b/scripts/rabbitmq-script-wrapper index ed4c276e53..9623f01709 100644 --- a/scripts/rabbitmq-script-wrapper +++ b/scripts/rabbitmq-script-wrapper @@ -15,14 +15,9 @@ ## Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. ## -SED_OPT="-E" -if [ $(uname -s) = "Linux" ]; then - SED_OPT="-r" -fi - for arg in "$@" ; do # Wrap each arg in single quotes and wrap single quotes in double quotes, so that they're passed through cleanly. - arg=`printf %s "$arg" | sed $SED_OPT -e "s/'/'\"'\"'/g"` + arg=`printf %s "$arg" | sed -e "s#'#'\"'\"'#g"` CMDLINE="${CMDLINE} '${arg}'" done diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index c11b6e4383..30743ea243 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -50,7 +50,7 @@ alarms :: [alarm()]}). -type(local_alarm() :: 'file_descriptor_limit'). --type(resource_alarm_source() :: 'disk' | 'node'). +-type(resource_alarm_source() :: 'disk' | 'memory'). -type(resource_alarm() :: {resource_limit, resource_alarm_source(), node()}). -type(alarm() :: local_alarm() | resource_alarm()). diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 2d8bdfa860..a97a9b50c8 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([master_prepare/4, master_go/8, slave/7]). +-export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]). -define(SYNC_PROGRESS_INTERVAL, 1000000). @@ -198,7 +198,7 @@ syncer(Ref, Log, MPid, SPids) -> [] -> Log("all slaves already synced", []); SPids1 -> MPid ! {ready, self()}, Log("mirrors ~p to sync", [[node(SPid) || SPid <- SPids1]]), - syncer_loop(Ref, MPid, SPids1) + syncer_check_resources(Ref, MPid, SPids1) end. await_slaves(Ref, SPids) -> @@ -217,12 +217,43 @@ await_slaves(Ref, SPids) -> %% 'sync_start' and so will not reply. We need to act as though they are %% down. +syncer_check_resources(Ref, MPid, SPids) -> + rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), + %% Before we ask the master node to send the first batch of messages + %% over here, we check if one node is already short on memory. If + %% that's the case, we wait for the alarm to be cleared before + %% starting the syncer loop. + AlarmedNodes = lists:any( + fun + ({{resource_limit, memory, _}, _}) -> true; + ({_, _}) -> false + end, rabbit_alarm:get_alarms()), + if + not AlarmedNodes -> + MPid ! {next, Ref}, + syncer_loop(Ref, MPid, SPids); + true -> + case wait_for_resources(Ref, SPids) of + cancel -> ok; + SPids1 -> MPid ! {next, Ref}, + syncer_loop(Ref, MPid, SPids1) + end + end. + syncer_loop(Ref, MPid, SPids) -> - MPid ! {next, Ref}, receive + {conserve_resources, memory, true} -> + case wait_for_resources(Ref, SPids) of + cancel -> ok; + SPids1 -> syncer_loop(Ref, MPid, SPids1) + end; + {conserve_resources, _, _} -> + %% Ignore other alerts. + syncer_loop(Ref, MPid, SPids); {msgs, Ref, Msgs} -> SPids1 = wait_for_credit(SPids), broadcast(SPids1, {sync_msgs, Ref, Msgs}), + MPid ! {next, Ref}, syncer_loop(Ref, MPid, SPids1); {cancel, Ref} -> %% We don't tell the slaves we will die - so when we do @@ -239,6 +270,10 @@ broadcast(SPids, Msg) -> SPid ! Msg end || SPid <- SPids]. +conserve_resources(Pid, Source, {_, Conserve, _}) -> + Pid ! {conserve_resources, Source, Conserve}, + ok. + wait_for_credit(SPids) -> case credit_flow:blocked() of true -> receive @@ -252,6 +287,24 @@ wait_for_credit(SPids) -> false -> SPids end. +wait_for_resources(Ref, SPids) -> + receive + {conserve_resources, memory, false} -> + SPids; + {conserve_resources, _, _} -> + %% Ignore other alerts. + wait_for_resources(Ref, SPids); + {cancel, Ref} -> + %% We don't tell the slaves we will die - so when we do + %% they interpret that as a failure, which is what we + %% want. + cancel; + {'DOWN', _, process, SPid, _} -> + credit_flow:peer_down(SPid), + SPids1 = wait_for_credit(lists:delete(SPid, SPids)), + wait_for_resources(Ref, SPids1) + end. + %% Syncer %% --------------------------------------------------------------------------- %% Slave |
