summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--scripts/rabbitmq-env8
-rw-r--r--scripts/rabbitmq-env.bat10
-rw-r--r--scripts/rabbitmq-script-wrapper7
-rwxr-xr-xscripts/rabbitmq-server-ha.ocf79
-rw-r--r--src/rabbit_alarm.erl2
-rw-r--r--src/rabbit_error_logger.erl4
-rw-r--r--src/rabbit_mirror_queue_sync.erl59
7 files changed, 115 insertions, 54 deletions
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 4c1d0c9da2..dffed035ea 100644
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -103,15 +103,11 @@ fi
##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
-SED_OPT="-E"
-if [ $(uname -s) = "Linux" ]; then
- SED_OPT="-r"
-fi
-
rmq_normalize_path() {
local path=$1
- echo "$path" | sed $SED_OPT -e 's,//+,/,g' -e 's,(.)/$,\1,'
+ # Remove redundant slashes and strip a trailing slash
+ echo "$path" | sed -e 's#/\{2,\}#/#g' -e 's#/$##'
}
rmq_normalize_path_var() {
diff --git a/scripts/rabbitmq-env.bat b/scripts/rabbitmq-env.bat
index 66a1daf851..d5df9ddbd6 100644
--- a/scripts/rabbitmq-env.bat
+++ b/scripts/rabbitmq-env.bat
@@ -77,6 +77,8 @@ if "!RABBITMQ_NODENAME!"=="" (
if "!NODENAME!"=="" (
REM We use Erlang to query the local hostname because
REM !COMPUTERNAME! and Erlang may return different results.
+ REM Start erl with -sname to make sure epmd is started.
+ call "%ERLANG_HOME%\bin\erl.exe" -A0 -noinput -boot start_clean -sname rabbit-prelaunch-epmd -eval "init:stop()." >nul 2>&1
for /f "delims=" %%F in ('call "%ERLANG_HOME%\bin\erl.exe" -A0 -noinput -boot start_clean -eval "net_kernel:start([list_to_atom(""rabbit-gethostname-"" ++ os:getpid()), %NAMETYPE%]), [_, H] = string:tokens(atom_to_list(node()), ""@""), io:format(""~s~n"", [H]), init:stop()."') do @set HOSTNAME=%%F
set RABBITMQ_NODENAME=rabbit@!HOSTNAME!
set HOSTNAME=
@@ -224,10 +226,10 @@ if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
set RABBITMQ_PLUGINS_EXPAND_DIR=!PLUGINS_EXPAND_DIR!
)
)
-if not exist "!RABBITMQ_PLUGINS_EXPAND_DIR!" (
- mkdir "!RABBITMQ_PLUGINS_EXPAND_DIR!"
-)
-for /f "delims=" %%F in ("!RABBITMQ_PLUGINS_EXPAND_DIR!") do set RABBITMQ_PLUGINS_EXPAND_DIR=%%~sF
+REM FIXME: RabbitMQ removes and recreates RABBITMQ_PLUGINS_EXPAND_DIR
+REM itself. Therefore we can't create it here in advance and escape the
+REM directory name, and RABBITMQ_PLUGINS_EXPAND_DIR must not contain
+REM non-US-ASCII characters.
REM [ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
diff --git a/scripts/rabbitmq-script-wrapper b/scripts/rabbitmq-script-wrapper
index ed4c276e53..9623f01709 100644
--- a/scripts/rabbitmq-script-wrapper
+++ b/scripts/rabbitmq-script-wrapper
@@ -15,14 +15,9 @@
## Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
##
-SED_OPT="-E"
-if [ $(uname -s) = "Linux" ]; then
- SED_OPT="-r"
-fi
-
for arg in "$@" ; do
# Wrap each arg in single quotes and wrap single quotes in double quotes, so that they're passed through cleanly.
- arg=`printf %s "$arg" | sed $SED_OPT -e "s/'/'\"'\"'/g"`
+ arg=`printf %s "$arg" | sed -e "s#'#'\"'\"'#g"`
CMDLINE="${CMDLINE} '${arg}'"
done
diff --git a/scripts/rabbitmq-server-ha.ocf b/scripts/rabbitmq-server-ha.ocf
index 1fc4c777df..5505c10581 100755
--- a/scripts/rabbitmq-server-ha.ocf
+++ b/scripts/rabbitmq-server-ha.ocf
@@ -407,8 +407,8 @@ proc_kill()
# OCF_SUCCESS
# LL
# Arguments:
-# $1 - pidfile or pid
-# $2 - service name used for logging
+# $1 - pidfile or pid or 'none', if stopping by the name matching
+# $2 - service name used for logging or for the failback stopping method
# $3 - stop process timeout (in sec), used to determine how many times we try
# SIGTERM and an upper limit on how long this function should try and
# stop the process. Defaults to 15.
@@ -425,16 +425,20 @@ proc_stop()
local i
local pid
local pidfile
- # check if provide just a number
- echo "${pid_param}" | egrep -q '^[0-9]+$'
- if [ $? -eq 0 ]; then
- pid="${pid_param}"
- elif [ -e "${pid_param}" ]; then # check if passed in a pid file
- pidfile="${pid_param}"
- pid=$(cat "${pidfile}" 2>/dev/null | tr -s " " "\n" | sort -u)
- else
- ocf_log warn "${LH} pid param ${pid_param} is not a file or a number, try match by ${service_name}"
+ if [ "${pid_param}" = "none" ] ; then
pid="none"
+ else
+ # check if provide just a number
+ echo "${pid_param}" | egrep -q '^[0-9]+$'
+ if [ $? -eq 0 ]; then
+ pid="${pid_param}"
+ elif [ -e "${pid_param}" ]; then # check if passed in a pid file
+ pidfile="${pid_param}"
+ pid=$(cat "${pidfile}" 2>/dev/null | tr -s " " "\n" | sort -u)
+ else
+ ocf_log warn "${LH} pid param ${pid_param} is not a file or a number, try match by ${service_name}"
+ pid="none"
+ fi
fi
# number of times to try a SIGTEM is (timeout - 5 seconds) / 2 seconds
local stop_count=$(( ($timeout-5)/2 ))
@@ -790,10 +794,14 @@ update_cookie() {
return $OCF_SUCCESS
}
-# Stop rmq beam process by pid or rabbit node name match. Returns SUCCESS/ERROR
+# Stop rmq beam process by pid and by rabbit node name match. Returns SUCCESS/ERROR
kill_rmq_and_remove_pid() {
local LH="${LL} kill_rmq_and_remove_pid():"
+ # Stop the rabbitmq-server by its pidfile, use the name matching as a fallback,
+ # and ignore the exit code
proc_stop "${OCF_RESKEY_pid_file}" "beam.*${RABBITMQ_NODENAME}" "${OCF_RESKEY_stop_time}"
+ # Ensure the beam.smp stopped by the rabbit node name matching as well
+ proc_stop none "beam.*${RABBITMQ_NODENAME}" "${OCF_RESKEY_stop_time}"
if [ $? -eq 0 ] ; then
return $OCF_SUCCESS
else
@@ -967,9 +975,11 @@ stop_server_process() {
[ $? -eq 0 ] && ocf_log info "${LH} RMQ-server process (PID=${pid}) stopped succesfully."
fi
- if [ -f ${OCF_RESKEY_pid_file} ] ; then
- # Ensure there is no beam process and pidfile left
- ocf_log warn "${LH} The pidfile still exists, forcing the RMQ-server cleanup"
+ # Ensure there is no beam process and pidfile left
+ pgrep -f "beam.*${RABBITMQ_NODENAME}" > /dev/null
+ rc=$?
+ if [ -f ${OCF_RESKEY_pid_file} -o $rc -eq 0 ] ; then
+ ocf_log warn "${LH} The pidfile or beam's still exist, forcing the RMQ-server cleanup"
kill_rmq_and_remove_pid
fi
@@ -1399,29 +1409,32 @@ get_monitor() {
if [ $rabbit_running -eq $OCF_SUCCESS ]
then
ocf_log info "${LH} rabbit app is running. checking if we are the part of healthy cluster"
- rc_check=$OCF_ERR_GENERIC
- nodelist=$(get_alive_pacemaker_nodes_but)
- for node in $nodelist
- do
- status_master=1
- # Do not refetch the master status for *this* node as we know it already
- if [ $rc -ne $OCF_RUNNING_MASTER ] ; then
+
+ if [ $rc -eq $OCF_RUNNING_MASTER ] ; then
+ # The master is always running inside of its cluster
+ ocf_log info "${LH} rabbit app is running and is master of cluster"
+ rc_check=$OCF_SUCCESS
+ else
+ rc_check=$OCF_ERR_GENERIC
+ nodelist=$(get_alive_pacemaker_nodes_but)
+ for node in $nodelist
+ do
ocf_log info "${LH} rabbit app is running. looking for master on $node"
is_master $node
status_master=$?
ocf_log info "${LH} fetched master attribute for $node. attr value is ${status_master}"
- fi
- if [ $status_master -eq 0 ] ; then
- ocf_log info "${LH} rabbit app is running. master is $node"
- if get_running_nodes | grep -q $(rabbit_node_name $node)
- then
- ocf_log info "${LH} rabbit app is running and is member of healthy cluster"
- rc_check=$OCF_SUCCESS
- break
+ if [ $status_master -eq 0 ] ; then
+ ocf_log info "${LH} rabbit app is running. master is $node"
+ if get_running_nodes | grep -q $(rabbit_node_name $node)
+ then
+ ocf_log info "${LH} rabbit app is running and is member of healthy cluster"
+ rc_check=$OCF_SUCCESS
+ break
+ fi
fi
- fi
- done
- [ $rc_check -eq $OCF_ERR_GENERIC ] && ocf_log err "${LH} rabbit node is running out of the cluster"
+ done
+ [ $rc_check -eq $OCF_ERR_GENERIC ] && ocf_log err "${LH} rabbit node is running out of the cluster"
+ fi
else
if [ "$OCF_CHECK_LEVEL" -gt 20 ]; then
ocf_log info "${LH} rabbit app is not running. checking if there is a master"
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index c11b6e4383..30743ea243 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -50,7 +50,7 @@
alarms :: [alarm()]}).
-type(local_alarm() :: 'file_descriptor_limit').
--type(resource_alarm_source() :: 'disk' | 'node').
+-type(resource_alarm_source() :: 'disk' | 'memory').
-type(resource_alarm() :: {resource_limit, resource_alarm_source(), node()}).
-type(alarm() :: local_alarm() | resource_alarm()).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index bbaf9577f9..d847284243 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -104,9 +104,11 @@ publish1(RoutingKey, Format, Data, LogExch) ->
Timestamp = time_compat:os_system_time(seconds),
Args = [truncate:term(A, ?LOG_TRUNC) || A <- Data],
+ Headers = [{<<"node">>, longstr, list_to_binary(atom_to_list(node()))}],
{ok, _DeliveredQPids} =
rabbit_basic:publish(LogExch, RoutingKey,
#'P_basic'{content_type = <<"text/plain">>,
- timestamp = Timestamp},
+ timestamp = Timestamp,
+ headers = Headers},
list_to_binary(io_lib:format(Format, Args))),
ok.
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 2d8bdfa860..a97a9b50c8 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([master_prepare/4, master_go/8, slave/7]).
+-export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
@@ -198,7 +198,7 @@ syncer(Ref, Log, MPid, SPids) ->
[] -> Log("all slaves already synced", []);
SPids1 -> MPid ! {ready, self()},
Log("mirrors ~p to sync", [[node(SPid) || SPid <- SPids1]]),
- syncer_loop(Ref, MPid, SPids1)
+ syncer_check_resources(Ref, MPid, SPids1)
end.
await_slaves(Ref, SPids) ->
@@ -217,12 +217,43 @@ await_slaves(Ref, SPids) ->
%% 'sync_start' and so will not reply. We need to act as though they are
%% down.
+syncer_check_resources(Ref, MPid, SPids) ->
+ rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
+ %% Before we ask the master node to send the first batch of messages
+ %% over here, we check if one node is already short on memory. If
+ %% that's the case, we wait for the alarm to be cleared before
+ %% starting the syncer loop.
+ AlarmedNodes = lists:any(
+ fun
+ ({{resource_limit, memory, _}, _}) -> true;
+ ({_, _}) -> false
+ end, rabbit_alarm:get_alarms()),
+ if
+ not AlarmedNodes ->
+ MPid ! {next, Ref},
+ syncer_loop(Ref, MPid, SPids);
+ true ->
+ case wait_for_resources(Ref, SPids) of
+ cancel -> ok;
+ SPids1 -> MPid ! {next, Ref},
+ syncer_loop(Ref, MPid, SPids1)
+ end
+ end.
+
syncer_loop(Ref, MPid, SPids) ->
- MPid ! {next, Ref},
receive
+ {conserve_resources, memory, true} ->
+ case wait_for_resources(Ref, SPids) of
+ cancel -> ok;
+ SPids1 -> syncer_loop(Ref, MPid, SPids1)
+ end;
+ {conserve_resources, _, _} ->
+ %% Ignore other alerts.
+ syncer_loop(Ref, MPid, SPids);
{msgs, Ref, Msgs} ->
SPids1 = wait_for_credit(SPids),
broadcast(SPids1, {sync_msgs, Ref, Msgs}),
+ MPid ! {next, Ref},
syncer_loop(Ref, MPid, SPids1);
{cancel, Ref} ->
%% We don't tell the slaves we will die - so when we do
@@ -239,6 +270,10 @@ broadcast(SPids, Msg) ->
SPid ! Msg
end || SPid <- SPids].
+conserve_resources(Pid, Source, {_, Conserve, _}) ->
+ Pid ! {conserve_resources, Source, Conserve},
+ ok.
+
wait_for_credit(SPids) ->
case credit_flow:blocked() of
true -> receive
@@ -252,6 +287,24 @@ wait_for_credit(SPids) ->
false -> SPids
end.
+wait_for_resources(Ref, SPids) ->
+ receive
+ {conserve_resources, memory, false} ->
+ SPids;
+ {conserve_resources, _, _} ->
+ %% Ignore other alerts.
+ wait_for_resources(Ref, SPids);
+ {cancel, Ref} ->
+ %% We don't tell the slaves we will die - so when we do
+ %% they interpret that as a failure, which is what we
+ %% want.
+ cancel;
+ {'DOWN', _, process, SPid, _} ->
+ credit_flow:peer_down(SPid),
+ SPids1 = wait_for_credit(lists:delete(SPid, SPids)),
+ wait_for_resources(Ref, SPids1)
+ end.
+
%% Syncer
%% ---------------------------------------------------------------------------
%% Slave