diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2016-02-18 18:30:18 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2016-02-18 18:30:18 +0300 |
| commit | c66d51f4f57c425bfa2fa152bd4895ed56915ceb (patch) | |
| tree | 0214fe81f65323a7eda762306bf443bbe0a4e6b7 | |
| parent | 4152ecbc7a7a4d0223a4f51f542149da04f37692 (diff) | |
| parent | 01e039f4dd2b6d78d9d67f7472bc06394f320fc3 (diff) | |
| download | rabbitmq-server-git-c66d51f4f57c425bfa2fa152bd4895ed56915ceb.tar.gz | |
Merge branch 'stable' into rabbitmq-server-248
| -rw-r--r-- | scripts/rabbitmq-env | 8 | ||||
| -rw-r--r-- | scripts/rabbitmq-env.bat | 10 | ||||
| -rw-r--r-- | scripts/rabbitmq-script-wrapper | 7 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server-ha.ocf | 79 | ||||
| -rw-r--r-- | src/rabbit_alarm.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_error_logger.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 59 |
7 files changed, 115 insertions, 54 deletions
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index 4c1d0c9da2..dffed035ea 100644 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -103,15 +103,11 @@ fi ##--- Set environment vars RABBITMQ_<var_name> to defaults if not set -SED_OPT="-E" -if [ $(uname -s) = "Linux" ]; then - SED_OPT="-r" -fi - rmq_normalize_path() { local path=$1 - echo "$path" | sed $SED_OPT -e 's,//+,/,g' -e 's,(.)/$,\1,' + # Remove redundant slashes and strip a trailing slash + echo "$path" | sed -e 's#/\{2,\}#/#g' -e 's#/$##' } rmq_normalize_path_var() { diff --git a/scripts/rabbitmq-env.bat b/scripts/rabbitmq-env.bat index 66a1daf851..d5df9ddbd6 100644 --- a/scripts/rabbitmq-env.bat +++ b/scripts/rabbitmq-env.bat @@ -77,6 +77,8 @@ if "!RABBITMQ_NODENAME!"=="" ( if "!NODENAME!"=="" (
REM We use Erlang to query the local hostname because
REM !COMPUTERNAME! and Erlang may return different results.
+ REM Start erl with -sname to make sure epmd is started.
+ call "%ERLANG_HOME%\bin\erl.exe" -A0 -noinput -boot start_clean -sname rabbit-prelaunch-epmd -eval "init:stop()." >nul 2>&1
for /f "delims=" %%F in ('call "%ERLANG_HOME%\bin\erl.exe" -A0 -noinput -boot start_clean -eval "net_kernel:start([list_to_atom(""rabbit-gethostname-"" ++ os:getpid()), %NAMETYPE%]), [_, H] = string:tokens(atom_to_list(node()), ""@""), io:format(""~s~n"", [H]), init:stop()."') do @set HOSTNAME=%%F
set RABBITMQ_NODENAME=rabbit@!HOSTNAME!
set HOSTNAME=
@@ -224,10 +226,10 @@ if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" ( set RABBITMQ_PLUGINS_EXPAND_DIR=!PLUGINS_EXPAND_DIR!
)
)
-if not exist "!RABBITMQ_PLUGINS_EXPAND_DIR!" (
- mkdir "!RABBITMQ_PLUGINS_EXPAND_DIR!"
-)
-for /f "delims=" %%F in ("!RABBITMQ_PLUGINS_EXPAND_DIR!") do set RABBITMQ_PLUGINS_EXPAND_DIR=%%~sF
+REM FIXME: RabbitMQ removes and recreates RABBITMQ_PLUGINS_EXPAND_DIR
+REM itself. Therefore we can't create it here in advance and escape the
+REM directory name, and RABBITMQ_PLUGINS_EXPAND_DIR must not contain
+REM non-US-ASCII characters.
REM [ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
diff --git a/scripts/rabbitmq-script-wrapper b/scripts/rabbitmq-script-wrapper index ed4c276e53..9623f01709 100644 --- a/scripts/rabbitmq-script-wrapper +++ b/scripts/rabbitmq-script-wrapper @@ -15,14 +15,9 @@ ## Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. ## -SED_OPT="-E" -if [ $(uname -s) = "Linux" ]; then - SED_OPT="-r" -fi - for arg in "$@" ; do # Wrap each arg in single quotes and wrap single quotes in double quotes, so that they're passed through cleanly. - arg=`printf %s "$arg" | sed $SED_OPT -e "s/'/'\"'\"'/g"` + arg=`printf %s "$arg" | sed -e "s#'#'\"'\"'#g"` CMDLINE="${CMDLINE} '${arg}'" done diff --git a/scripts/rabbitmq-server-ha.ocf b/scripts/rabbitmq-server-ha.ocf index 1fc4c777df..5505c10581 100755 --- a/scripts/rabbitmq-server-ha.ocf +++ b/scripts/rabbitmq-server-ha.ocf @@ -407,8 +407,8 @@ proc_kill() # OCF_SUCCESS # LL # Arguments: -# $1 - pidfile or pid -# $2 - service name used for logging +# $1 - pidfile or pid or 'none', if stopping by the name matching +# $2 - service name used for logging or for the failback stopping method # $3 - stop process timeout (in sec), used to determine how many times we try # SIGTERM and an upper limit on how long this function should try and # stop the process. Defaults to 15. @@ -425,16 +425,20 @@ proc_stop() local i local pid local pidfile - # check if provide just a number - echo "${pid_param}" | egrep -q '^[0-9]+$' - if [ $? -eq 0 ]; then - pid="${pid_param}" - elif [ -e "${pid_param}" ]; then # check if passed in a pid file - pidfile="${pid_param}" - pid=$(cat "${pidfile}" 2>/dev/null | tr -s " " "\n" | sort -u) - else - ocf_log warn "${LH} pid param ${pid_param} is not a file or a number, try match by ${service_name}" + if [ "${pid_param}" = "none" ] ; then pid="none" + else + # check if provide just a number + echo "${pid_param}" | egrep -q '^[0-9]+$' + if [ $? -eq 0 ]; then + pid="${pid_param}" + elif [ -e "${pid_param}" ]; then # check if passed in a pid file + pidfile="${pid_param}" + pid=$(cat "${pidfile}" 2>/dev/null | tr -s " " "\n" | sort -u) + else + ocf_log warn "${LH} pid param ${pid_param} is not a file or a number, try match by ${service_name}" + pid="none" + fi fi # number of times to try a SIGTEM is (timeout - 5 seconds) / 2 seconds local stop_count=$(( ($timeout-5)/2 )) @@ -790,10 +794,14 @@ update_cookie() { return $OCF_SUCCESS } -# Stop rmq beam process by pid or rabbit node name match. Returns SUCCESS/ERROR +# Stop rmq beam process by pid and by rabbit node name match. Returns SUCCESS/ERROR kill_rmq_and_remove_pid() { local LH="${LL} kill_rmq_and_remove_pid():" + # Stop the rabbitmq-server by its pidfile, use the name matching as a fallback, + # and ignore the exit code proc_stop "${OCF_RESKEY_pid_file}" "beam.*${RABBITMQ_NODENAME}" "${OCF_RESKEY_stop_time}" + # Ensure the beam.smp stopped by the rabbit node name matching as well + proc_stop none "beam.*${RABBITMQ_NODENAME}" "${OCF_RESKEY_stop_time}" if [ $? -eq 0 ] ; then return $OCF_SUCCESS else @@ -967,9 +975,11 @@ stop_server_process() { [ $? -eq 0 ] && ocf_log info "${LH} RMQ-server process (PID=${pid}) stopped succesfully." fi - if [ -f ${OCF_RESKEY_pid_file} ] ; then - # Ensure there is no beam process and pidfile left - ocf_log warn "${LH} The pidfile still exists, forcing the RMQ-server cleanup" + # Ensure there is no beam process and pidfile left + pgrep -f "beam.*${RABBITMQ_NODENAME}" > /dev/null + rc=$? + if [ -f ${OCF_RESKEY_pid_file} -o $rc -eq 0 ] ; then + ocf_log warn "${LH} The pidfile or beam's still exist, forcing the RMQ-server cleanup" kill_rmq_and_remove_pid fi @@ -1399,29 +1409,32 @@ get_monitor() { if [ $rabbit_running -eq $OCF_SUCCESS ] then ocf_log info "${LH} rabbit app is running. checking if we are the part of healthy cluster" - rc_check=$OCF_ERR_GENERIC - nodelist=$(get_alive_pacemaker_nodes_but) - for node in $nodelist - do - status_master=1 - # Do not refetch the master status for *this* node as we know it already - if [ $rc -ne $OCF_RUNNING_MASTER ] ; then + + if [ $rc -eq $OCF_RUNNING_MASTER ] ; then + # The master is always running inside of its cluster + ocf_log info "${LH} rabbit app is running and is master of cluster" + rc_check=$OCF_SUCCESS + else + rc_check=$OCF_ERR_GENERIC + nodelist=$(get_alive_pacemaker_nodes_but) + for node in $nodelist + do ocf_log info "${LH} rabbit app is running. looking for master on $node" is_master $node status_master=$? ocf_log info "${LH} fetched master attribute for $node. attr value is ${status_master}" - fi - if [ $status_master -eq 0 ] ; then - ocf_log info "${LH} rabbit app is running. master is $node" - if get_running_nodes | grep -q $(rabbit_node_name $node) - then - ocf_log info "${LH} rabbit app is running and is member of healthy cluster" - rc_check=$OCF_SUCCESS - break + if [ $status_master -eq 0 ] ; then + ocf_log info "${LH} rabbit app is running. master is $node" + if get_running_nodes | grep -q $(rabbit_node_name $node) + then + ocf_log info "${LH} rabbit app is running and is member of healthy cluster" + rc_check=$OCF_SUCCESS + break + fi fi - fi - done - [ $rc_check -eq $OCF_ERR_GENERIC ] && ocf_log err "${LH} rabbit node is running out of the cluster" + done + [ $rc_check -eq $OCF_ERR_GENERIC ] && ocf_log err "${LH} rabbit node is running out of the cluster" + fi else if [ "$OCF_CHECK_LEVEL" -gt 20 ]; then ocf_log info "${LH} rabbit app is not running. checking if there is a master" diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index c11b6e4383..30743ea243 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -50,7 +50,7 @@ alarms :: [alarm()]}). -type(local_alarm() :: 'file_descriptor_limit'). --type(resource_alarm_source() :: 'disk' | 'node'). +-type(resource_alarm_source() :: 'disk' | 'memory'). -type(resource_alarm() :: {resource_limit, resource_alarm_source(), node()}). -type(alarm() :: local_alarm() | resource_alarm()). diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index bbaf9577f9..d847284243 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -104,9 +104,11 @@ publish1(RoutingKey, Format, Data, LogExch) -> Timestamp = time_compat:os_system_time(seconds), Args = [truncate:term(A, ?LOG_TRUNC) || A <- Data], + Headers = [{<<"node">>, longstr, list_to_binary(atom_to_list(node()))}], {ok, _DeliveredQPids} = rabbit_basic:publish(LogExch, RoutingKey, #'P_basic'{content_type = <<"text/plain">>, - timestamp = Timestamp}, + timestamp = Timestamp, + headers = Headers}, list_to_binary(io_lib:format(Format, Args))), ok. diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 2d8bdfa860..a97a9b50c8 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([master_prepare/4, master_go/8, slave/7]). +-export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]). -define(SYNC_PROGRESS_INTERVAL, 1000000). @@ -198,7 +198,7 @@ syncer(Ref, Log, MPid, SPids) -> [] -> Log("all slaves already synced", []); SPids1 -> MPid ! {ready, self()}, Log("mirrors ~p to sync", [[node(SPid) || SPid <- SPids1]]), - syncer_loop(Ref, MPid, SPids1) + syncer_check_resources(Ref, MPid, SPids1) end. await_slaves(Ref, SPids) -> @@ -217,12 +217,43 @@ await_slaves(Ref, SPids) -> %% 'sync_start' and so will not reply. We need to act as though they are %% down. +syncer_check_resources(Ref, MPid, SPids) -> + rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), + %% Before we ask the master node to send the first batch of messages + %% over here, we check if one node is already short on memory. If + %% that's the case, we wait for the alarm to be cleared before + %% starting the syncer loop. + AlarmedNodes = lists:any( + fun + ({{resource_limit, memory, _}, _}) -> true; + ({_, _}) -> false + end, rabbit_alarm:get_alarms()), + if + not AlarmedNodes -> + MPid ! {next, Ref}, + syncer_loop(Ref, MPid, SPids); + true -> + case wait_for_resources(Ref, SPids) of + cancel -> ok; + SPids1 -> MPid ! {next, Ref}, + syncer_loop(Ref, MPid, SPids1) + end + end. + syncer_loop(Ref, MPid, SPids) -> - MPid ! {next, Ref}, receive + {conserve_resources, memory, true} -> + case wait_for_resources(Ref, SPids) of + cancel -> ok; + SPids1 -> syncer_loop(Ref, MPid, SPids1) + end; + {conserve_resources, _, _} -> + %% Ignore other alerts. + syncer_loop(Ref, MPid, SPids); {msgs, Ref, Msgs} -> SPids1 = wait_for_credit(SPids), broadcast(SPids1, {sync_msgs, Ref, Msgs}), + MPid ! {next, Ref}, syncer_loop(Ref, MPid, SPids1); {cancel, Ref} -> %% We don't tell the slaves we will die - so when we do @@ -239,6 +270,10 @@ broadcast(SPids, Msg) -> SPid ! Msg end || SPid <- SPids]. +conserve_resources(Pid, Source, {_, Conserve, _}) -> + Pid ! {conserve_resources, Source, Conserve}, + ok. + wait_for_credit(SPids) -> case credit_flow:blocked() of true -> receive @@ -252,6 +287,24 @@ wait_for_credit(SPids) -> false -> SPids end. +wait_for_resources(Ref, SPids) -> + receive + {conserve_resources, memory, false} -> + SPids; + {conserve_resources, _, _} -> + %% Ignore other alerts. + wait_for_resources(Ref, SPids); + {cancel, Ref} -> + %% We don't tell the slaves we will die - so when we do + %% they interpret that as a failure, which is what we + %% want. + cancel; + {'DOWN', _, process, SPid, _} -> + credit_flow:peer_down(SPid), + SPids1 = wait_for_credit(lists:delete(SPid, SPids)), + wait_for_resources(Ref, SPids1) + end. + %% Syncer %% --------------------------------------------------------------------------- %% Slave |
