summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xpackaging/common/rabbitmq-server-ha.ocf287
-rw-r--r--src/rabbit.erl3
-rw-r--r--src/rabbit_mirror_queue_misc.erl6
-rw-r--r--src/rabbit_mirror_queue_sync.erl39
-rw-r--r--src/rabbit_nodes.erl6
-rw-r--r--src/rabbit_priority_queue.erl21
-rw-r--r--src/rabbit_queue_decorator.erl2
7 files changed, 314 insertions, 50 deletions
diff --git a/packaging/common/rabbitmq-server-ha.ocf b/packaging/common/rabbitmq-server-ha.ocf
index 8d9346b910..5ccda7b745 100755
--- a/packaging/common/rabbitmq-server-ha.ocf
+++ b/packaging/common/rabbitmq-server-ha.ocf
@@ -30,6 +30,9 @@ OCF_RESKEY_ctl_default="/usr/sbin/rabbitmqctl"
OCF_RESKEY_debug_default=false
OCF_RESKEY_username_default="rabbitmq"
OCF_RESKEY_groupname_default="rabbitmq"
+OCF_RESKEY_admin_user_default="guest"
+OCF_RESKEY_admin_password_default="guest"
+OCF_RESKEY_definitions_dump_file_default="/etc/rabbitmq/definitions"
OCF_RESKEY_pid_file_default="/var/run/rabbitmq/pid"
OCF_RESKEY_log_dir_default="/var/log/rabbitmq"
OCF_RESKEY_mnesia_base_default="/var/lib/rabbitmq/mnesia"
@@ -37,6 +40,7 @@ OCF_RESKEY_node_port_default=5672
OCF_RESKEY_erlang_cookie_default=false
OCF_RESKEY_erlang_cookie_file_default="/var/lib/rabbitmq/.erlang.cookie"
OCF_RESKEY_use_fqdn_default=false
+OCF_RESKEY_max_rabbitmqctl_timeouts_default=1
: ${HA_LOGTAG="lrmd"}
: ${HA_LOGFACILITY="daemon"}
@@ -45,6 +49,9 @@ OCF_RESKEY_use_fqdn_default=false
: ${OCF_RESKEY_debug=${OCF_RESKEY_debug_default}}
: ${OCF_RESKEY_username=${OCF_RESKEY_username_default}}
: ${OCF_RESKEY_groupname=${OCF_RESKEY_groupname_default}}
+: ${OCF_RESKEY_admin_user=${OCF_RESKEY_admin_user_default}}
+: ${OCF_RESKEY_admin_password=${OCF_RESKEY_admin_password_default}}
+: ${OCF_RESKEY_definitions_dump_file=${OCF_RESKEY_definitions_dump_file_default}}
: ${OCF_RESKEY_log_dir=${OCF_RESKEY_log_dir_default}}
: ${OCF_RESKEY_mnesia_base=${OCF_RESKEY_mnesia_base_default}}
: ${OCF_RESKEY_pid_file=${OCF_RESKEY_pid_file_default}}
@@ -52,11 +59,14 @@ OCF_RESKEY_use_fqdn_default=false
: ${OCF_RESKEY_erlang_cookie=${OCF_RESKEY_erlang_cookie_default}}
: ${OCF_RESKEY_erlang_cookie_file=${OCF_RESKEY_erlang_cookie_file_default}}
: ${OCF_RESKEY_use_fqdn=${OCF_RESKEY_use_fqdn_default}}
+: ${OCF_RESKEY_max_rabbitmqctl_timeouts=${OCF_RESKEY_max_rabbitmqctl_timeouts_default}}
#######################################################################
OCF_RESKEY_start_time_default=$((OCF_RESKEY_CRM_meta_timeout / 6000 + 2))
: ${OCF_RESKEY_start_time=${OCF_RESKEY_start_time_default}}
+OCF_RESKEY_stop_time_default=${OCF_RESKEY_start_time_default}
+: ${OCF_RESKEY_stop_time=${OCF_RESKEY_start_time_default}}
OCF_RESKEY_command_timeout_default=""
: ${OCF_RESKEY_command_timeout=${OCF_RESKEY_command_timeout_default}}
TIMEOUT_ARG=$((OCF_RESKEY_CRM_meta_timeout / 6000 + 30))
@@ -141,6 +151,30 @@ RabbitMQ group name
<content type="string" default="${OCF_RESKEY_groupname_default}" />
</parameter>
+<parameter name="admin_user" unique="0" required="0">
+<longdesc lang="en">
+RabbitMQ default admin user for API
+</longdesc>
+<shortdesc lang="en">RabbitMQ admin user</shortdesc>
+<content type="string" default="${OCF_RESKEY_admin_user_default}" />
+</parameter>
+
+<parameter name="admin_password" unique="0" required="0">
+<longdesc lang="en">
+RabbitMQ default admin user password for API
+</longdesc>
+<shortdesc lang="en">RabbitMQ admin password</shortdesc>
+<content type="string" default="${OCF_RESKEY_admin_password_default}" />
+</parameter>
+
+<parameter name="definitions_dump_file" unique="0" required="0">
+<longdesc lang="en">
+RabbitMQ default definitions dump file
+</longdesc>
+<shortdesc lang="en">RabbitMQ definitions dump file</shortdesc>
+<content type="string" default="${OCF_RESKEY_definitions_dump_file}" />
+</parameter>
+
<parameter name="command_timeout" unique="0" required="0">
<longdesc lang="en">
Timeout command arguments for issued commands termination (value is auto evaluated)
@@ -157,6 +191,14 @@ Timeout for start rabbitmq server
<content type="string" default="${OCF_RESKEY_start_time_default}" />
</parameter>
+<parameter name="stop_time" unique="0" required="0">
+<longdesc lang="en">
+Timeout for stopping rabbitmq server
+</longdesc>
+<shortdesc lang="en">Timeout for stopping rabbitmq server</shortdesc>
+<content type="string" default="${OCF_RESKEY_stop_time_default}" />
+</parameter>
+
<parameter name="debug" unique="0" required="0">
<longdesc lang="en">
The debug flag for agent (${OCF_RESKEY_binary}) instance.
@@ -207,6 +249,16 @@ Either to use FQDN or a shortname for the rabbitmq node
<content type="boolean" default="${OCF_RESKEY_erlang_cookie_file_default}" />
</parameter>
+<parameter name="max_rabbitmqctl_timeouts" unique="0" required="0">
+<longdesc lang="en">
+If during monitor call rabbitmqctl times out, the timeout is ignored
+unless it is Nth timeout in a row. Here N is the value of the current parameter.
+If too many timeouts happen in a raw, the monitor call will return with error.
+</longdesc>
+<shortdesc lang="en">Fail only if that many rabbitmqctl timeouts in a row occurred</shortdesc>
+<content type="string" default="${OCF_RESKEY_max_rabbitmqctl_timeouts_default}" />
+</parameter>
+
</parameters>
<actions>
@@ -232,6 +284,13 @@ END
# Invokes the given command as a rabbitmq user and wrapped in the
# timeout command.
su_rabbit_cmd() {
+ local timeout
+ if [ "$1" = "-t" ]; then
+ timeout=="/usr/bin/timeout ${OCF_RESKEY_command_timeout} $2"
+ shift 2
+ else
+ timeout=$COMMAND_TIMEOUT
+ fi
local cmd="${1:-status}"
local LH="${LL} su_rabbit_cmd():"
local rc=1
@@ -242,7 +301,7 @@ su_rabbit_cmd() {
ocf_log debug "${LH} invoking a command: ${cmd}"
su $user -s /bin/sh -c "USER=${user} MAIL=${mail} PWD=${pwd} HOME=${home} LOGNAME=${user} \
- ${COMMAND_TIMEOUT} ${cmd}"
+ ${timeout} ${cmd}"
rc=$?
ocf_log info "${LH} the invoked command exited ${rc}: ${cmd}"
return $rc
@@ -331,6 +390,7 @@ rmq_setup_env() {
RMQ_START_TIME="${MNESIA_FILES}/ocf_server_start_time.txt"
MASTER_FLAG_FILE="${MNESIA_FILES}/ocf_master_for_${OCF_RESOURCE_INSTANCE}"
THIS_PCMK_NODE=`crm_node -n`
+ TOTALVMEM=`free -mt | awk '/Total:/ {print $2}'`
# check and make PID file dir
local PID_DIR=$( dirname $OCF_RESKEY_pid_file )
if [ ! -d ${PID_DIR} ] ; then
@@ -994,12 +1054,17 @@ get_status() {
rc=$?
if [ $rc -ne 0 ] ; then
+ ocf_log info "get_status() failed with code ${rc}. Command output: ${body}"
return $OCF_NOT_RUNNING
fi
if [ "${what}" ] ; then
rc=$OCF_NOT_RUNNING
echo "$body" | grep "\{${what}," 2>&1 > /dev/null && rc=$OCF_SUCCESS
+
+ if [ $rc -ne $OCF_SUCCESS ] ; then
+ ocf_log info "get_status(): app ${what} was not found in command output: ${body}"
+ fi
fi
return $rc
@@ -1025,6 +1090,55 @@ is_master() {
return 0
}
+# Verify if su_rabbit_cmd exited by timeout by checking its return code.
+# If it did not, return 0. If it did AND it is
+# $OCF_RESKEY_max_rabbitmqctl_timeouts'th timeout in a row,
+# return 2 to signal get_monitor that it should
+# exit with error. Otherwise return 1 to signal that there was a timeout,
+# but it should be ignored. Timeouts for different operations are tracked
+# separately. The second argument is used to distingush them.
+check_timeouts() {
+ local op_rc=$1
+ local crm_attr_name=$2
+ local op_name=$3
+
+ if [ $op_rc -ne 124 -a $op_rc -ne 137 ]; then
+ ocf_run crm_attribute -N $THIS_PCMK_NODE -l reboot --name $crm_attr_name --update 0
+ return 0
+ fi
+
+ local count
+ count=`crm_attribute -N $THIS_PCMK_NODE -l reboot --name $crm_attr_name --query 2>/dev/null | awk '{print $3}' | awk -F "=" '{print $2}' | sed -e '/(null)/d'`
+ if [ $? -ne 0 ]; then
+ # the crm_attribute exited with error. In that case most probably it printed garbage
+ # instead of the number we need. So defensively assume that it is zero.
+
+ count=0
+ fi
+
+ count=$((count+1))
+ # There is a slight chance that this piece of code will be executed twice simultaneously.
+ # As a result, $crm_attr_name's value will be one less than it should be. But we don't need
+ # precise calculation here.
+ ocf_run crm_attribute -N $THIS_PCMK_NODE -l reboot --name $crm_attr_name --update $count
+
+ if [ $count -lt $OCF_RESKEY_max_rabbitmqctl_timeouts ]; then
+ ocf_log warn "${LH} 'rabbitmqctl $op_name' timed out $count of max. $OCF_RESKEY_max_rabbitmqctl_timeouts time(s) in a row. Doing nothing for now."
+ return 1
+ else
+ ocf_log err "${LH} 'rabbitmqctl $op_name' timed out $count of max. $OCF_RESKEY_max_rabbitmqctl_timeouts time(s) in a row and is not responding. The resource is failed."
+ return 2
+ fi
+}
+
+wait_sync() {
+ wait_time=$1
+
+ queues="${COMMAND_TIMEOUT} ${OCF_RESKEY_ctl} list_queues name state"
+ su_rabbit_cmd -t "${wait_time}s" "sh -c \"while $queues | grep -q 'syncing,'; \
+ do sleep 1; done\""
+ return $?
+}
get_monitor() {
local rc=$OCF_ERR_GENERIC
@@ -1157,29 +1271,94 @@ get_monitor() {
fi
fi
+ # Skip all other checks if rabbit app is not running
+ if [ $rabbit_running -ne $OCF_SUCCESS ]; then
+ ocf_log info "${LH} RabbitMQ is not running, get_monitor function ready to return ${rc}"
+ return $rc
+ fi
+
# Check if the rabbitmqctl control plane is alive.
- # The rabbit app may be not running and the command
- # will return > 0, so we only check if the command execution
- # has timed out (which is a code 137 or 124)
+ local rc_alive
+ local timeout_alive
su_rabbit_cmd "${OCF_RESKEY_ctl} list_channels 2>&1 > /dev/null"
- local rc_alive=$?
- if [ $rc_alive -eq 137 -o $rc_alive -eq 124 ]; then
- ocf_log err "${LH} rabbitmqctl is not responding. The resource is failed."
+ rc_alive=$?
+ check_timeouts $rc_alive "rabbit_list_channels_timeouts" "list_channels"
+ timeout_alive=$?
+
+ if [ $timeout_alive -eq 2 ]; then
return $OCF_ERR_GENERIC
+ elif [ $timeout_alive -eq 0 ]; then
+ if [ $rc_alive -ne 0 ]; then
+ ocf_log err "${LH} rabbitmqctl list_channels exited with errors."
+ rc=$OCF_ERR_GENERIC
+ fi
+ fi
+
+ # Check for memory alarms for this Master or Slave node.
+ # If alert found, reset the alarm
+ # and restart the resource as it likely means a dead end situation
+ # when rabbitmq cluster is running with blocked publishing due
+ # to high memory watermark exceeded.
+ local alarms
+ local rc_alarms
+ local timeout_alarms
+ alarms=`su_rabbit_cmd "${OCF_RESKEY_ctl} -q eval 'rabbit_alarm:get_alarms().'"`
+ rc_alarms=$?
+ check_timeouts $rc_alarms "rabbit_get_alarms_timeouts" "get_alarms"
+ timeout_alarms=$?
+
+ if [ $timeout_alarms -eq 2 ]; then
+ return $OCF_ERR_GENERIC
+
+ elif [ $timeout_alarms -eq 0 ]; then
+ if [ $rc_alarms -ne 0 ]; then
+ ocf_log err "${LH} rabbitmqctl get_alarms exited with errors."
+ rc=$OCF_ERR_GENERIC
+
+ elif [ -n "${alarms}" ]; then
+ for node in "${alarms}"; do
+ name=`echo ${node} | perl -n -e "m/memory,'(?<n>\S+)+'/ && print \"$+{n}\n\""`
+ if [ "${name}" = "${RABBITMQ_NODENAME}" ] ; then
+ ocf_log err "${LH} Found raised memory alarm. Erasing the alarm and restarting."
+ su_rabbit_cmd "${OCF_RESKEY_ctl} set_vm_memory_high_watermark 10 2>&1 > /dev/null"
+ rc=$OCF_ERR_GENERIC
+ break
+ fi
+ done
+ fi
fi
# Check if the list of all queues is available,
- # Skip the check if rabbit app is not running yet.
- su_rabbit_cmd "${OCF_RESKEY_ctl} -q list_queues"
- local rc_queues=$?
-
- # If the rabbit app is running,
- # we have to additionally check here if the channels/queues list results were ok.
- if [ $rabbit_running -eq $OCF_SUCCESS ]; then
- # Check if the rabbitmqctl control plane returned no errors for issued requests.
- if [ $rc_alive -ne 0 -o $rc_queues -ne 0 ]; then
- ocf_log err "${LH} rabbitmqctl exited with errors."
+ # Also report some queues stats and total virtual memory.
+ local queues
+ local rc_queues
+ local timeout_queues
+ queues=`su_rabbit_cmd "${OCF_RESKEY_ctl} -q list_queues memory messages consumer_utilisation"`
+ rc_queues=$?
+ check_timeouts $rc_queues "rabbit_list_queues_timeouts" "list_queues"
+ timeout_queues=$?
+
+ if [ $timeout_queues -eq 2 ]; then
+ return $OCF_ERR_GENERIC
+
+ elif [ $timeout_queues -eq 0 ]; then
+ if [ $rc_queues -ne 0 ]; then
+ ocf_log err "${LH} rabbitmqctl list_queues exited with errors."
rc=$OCF_ERR_GENERIC
+
+ elif [ -n "${queues}" ]; then
+ local q_c
+ q_c=`printf "%b\n" "${queues}" | wc -l`
+ local mem
+ mem=`printf "%b\n" "${queues}" | awk -v sum=0 '{sum+=$1} END {print (sum/1048576)}'`
+ local mes
+ mes=`printf "%b\n" "${queues}" | awk -v sum=0 '{sum+=$2} END {print sum}'`
+ local c_u
+ c_u=`printf "%b\n" "${queues}" | awk -v sum=0 -v cnt=${q_c} '{sum+=$3} END {print (sum+1)/(cnt+1)}'`
+ local status
+ status=`echo $(su_rabbit_cmd "${OCF_RESKEY_ctl} -q status")`
+ ocf_log info "${LH} RabbitMQ is running ${q_c} queues consuming ${mem}m of ${TOTALVMEM}m total, with ${mes} queued messages, average consumer utilization ${c_u}"
+ ocf_log info "${LH} RabbitMQ status: ${status}"
fi
fi
@@ -1234,6 +1413,10 @@ action_start() {
ocf_log info "${LH} RMQ prepared for start succesfully."
fi
+ ocf_run crm_attribute -N $THIS_PCMK_NODE -l reboot --name 'rabbit_list_channels_timeouts' --update '0'
+ ocf_run crm_attribute -N $THIS_PCMK_NODE -l reboot --name 'rabbit_get_alarms_timeouts' --update '0'
+ ocf_run crm_attribute -N $THIS_PCMK_NODE -l reboot --name 'rabbit_list_queues_timeouts' --update '0'
+
ocf_log info "${LH} action end."
return $rc
}
@@ -1252,6 +1435,10 @@ action_stop() {
ocf_log info "${LH} action begin."
+ # Wait for synced state first
+ ocf_log info "${LH} waiting $((OCF_RESKEY_stop_time/2)) to sync"
+ wait_sync $((OCF_RESKEY_stop_time/2))
+
# remove master flag
# remove master score
crm_attribute -N $THIS_PCMK_NODE -l reboot --name 'rabbit-master' --delete
@@ -1346,43 +1533,50 @@ action_notify() {
case "$OCF_RESKEY_CRM_meta_notify_operation" in
promote)
ocf_log info "${LH} post-promote begin."
- # Report not running, if the list of nodes being promoted reported empty
+ # Do nothing, if the list of nodes being promoted reported empty.
+ # Delegate recovery, if needed, to the "running out of the cluster" monitor's logic
if [ -z "${OCF_RESKEY_CRM_meta_notify_promote_uname}" ] ; then
- ocf_log warn "${LH} there are no nodes to join to reported on post-promote. The resource will be restarted."
+ ocf_log warn "${LH} there are no nodes to join to reported on post-promote. Nothing to do."
ocf_log info "${LH} post-promote end."
- return $OCF_NOT_RUNNING
+ return $OCF_SUCCESS
fi
# Note, this should fail when the mnesia is inconsistent.
- # For example, when the "old" master processing the promotion of the new one.
+ # For example, when the "old" master processing the promition of the new one.
# Later this ex-master node will rejoin the cluster at post-start.
jjj_join "${OCF_RESKEY_CRM_meta_notify_promote_uname}"
rc=$?
ocf_log info "${LH} post-promote end."
if [ $rc -eq $OCF_ERR_GENERIC ] ; then
ocf_log err "${LH} Failed to join the cluster on post-promote. The resource will be restarted."
- return $OCF_NOT_RUNNING
+ return $OCF_ERR_GENERIC
fi
;;
start)
ocf_log info "${LH} post-start begin."
local nodes_list="${OCF_RESKEY_CRM_meta_notify_start_uname} ${OCF_RESKEY_CRM_meta_notify_active_uname}"
- # Report not running, if the list of nodes being started or running reported empty
+ # Do nothing, if the list of nodes being started or running reported empty
+ # Delegate recovery, if needed, to the "running out of the cluster" monitor's logic
if [ -z "${nodes_list}" ] ; then
- ocf_log warn "${LH} there are no nodes to join to reported on post-promote. The resource will be restarted."
+ ocf_log warn "${LH} I'm a last man standing and I must survive!"
ocf_log info "${LH} post-start end."
- return $OCF_NOT_RUNNING
+ return $OCF_SUCCESS
fi
# check did this event from this host
my_host "${nodes_list}"
rc=$?
- # Report not running, if there is no master reported
+ # Do nothing, if there is no master reported
+ # Delegate recovery, if needed, to the "running out of the cluster" monitor's logic
if [ -z "${OCF_RESKEY_CRM_meta_notify_master_uname}" ] ; then
- ocf_log warn "${LH} there are no nodes to join to reported on post-start. The resource will be restarted."
+ ocf_log warn "${LH} there are no nodes to join to reported on post-start. Nothing to do."
ocf_log info "${LH} post-start end."
- return $OCF_NOT_RUNNING
+ return $OCF_SUCCESS
fi
if [ $rc -eq $OCF_SUCCESS ] ; then
- check_need_join_to "${OCF_RESKEY_CRM_meta_notify_master_uname}"
+ # Now we need to:
+ # a. join to the cluster if we are not joined yet
+ # b. start the RabbitMQ application, which is always
+ # stopped after start action finishes
+ check_need_join_to ${OCF_RESKEY_CRM_meta_notify_master_uname}
rc_join=$?
if [ $rc_join -eq $OCF_SUCCESS ]; then
ocf_log warn "${LH} Going to join node ${OCF_RESKEY_CRM_meta_notify_master_uname}"
@@ -1390,13 +1584,27 @@ action_notify() {
rc2=$?
else
ocf_log warn "${LH} We are already clustered with node ${OCF_RESKEY_CRM_meta_notify_master_uname}"
- rc2=$OCF_SUCCESS
+ if try_to_start_rmq_app; then
+ rc2=$OCF_SUCCESS
+ else
+ rc2=$OCF_ERR_GENERIC
+ fi
fi
ocf_log info "${LH} post-start end."
+ if [ -s "${OCF_RESKEY_definitions_dump_file}" ] ; then
+ ocf_log info "File ${OCF_RESKEY_definitions_dump_file} exists"
+ ocf_run curl -X POST -u $OCF_RESKEY_admin_user:$OCF_RESKEY_admin_password 127.0.0.1:15672/api/definitions --header "Content-Type:application/json" -d @$OCF_RESKEY_definitions_dump_file
+ rc=$?
+ if [ $rc -eq $OCF_SUCCESS ] ; then
+ ocf_log info "RMQ definitions have imported succesfully."
+ else
+ ocf_log err "RMQ definitions have not imported."
+ fi
+ fi
if [ $rc2 -eq $OCF_ERR_GENERIC ] ; then
ocf_log warn "${LH} Failed to join the cluster on post-start. The resource will be restarted."
ocf_log info "${LH} post-start end."
- return $OCF_NOT_RUNNING
+ return $OCF_ERR_GENERIC
fi
fi
;;
@@ -1407,12 +1615,15 @@ action_notify() {
if [ -z "${OCF_RESKEY_CRM_meta_notify_stop_uname}" ] ; then
ocf_log warn "${LH} there are no nodes being stopped reported on post-stop. The resource will be restarted."
ocf_log info "${LH} post-stop end."
- return $OCF_NOT_RUNNING
+ return $OCF_ERR_GENERIC
fi
my_host "${OCF_RESKEY_CRM_meta_notify_stop_uname}"
rc=$?
if [ $rc -ne $OCF_SUCCESS ] ; then
- # On ohter nodes processing the post-stop, make sure the stopped node will be forgotten
+ # Wait for synced state first
+ ocf_log info "${LH} waiting $((OCF_RESKEY_stop_time/2)) to sync"
+ wait_sync $((OCF_RESKEY_stop_time/2))
+ # On other nodes processing the post-stop, make sure the stopped node will be forgotten
unjoin_nodes_from_cluster "${OCF_RESKEY_CRM_meta_notify_stop_uname}"
else
# On the nodes being stopped, reset the master score
@@ -1429,7 +1640,7 @@ action_notify() {
if [ -z "${OCF_RESKEY_CRM_meta_notify_demote_uname}" ] ; then
ocf_log warn "${LH} there are no nodes being demoted reported on post-demote. The resource will be restarted."
ocf_log info "${LH} post-demote end."
- return $OCF_NOT_RUNNING
+ return $OCF_ERR_GENERIC
fi
my_host "${OCF_RESKEY_CRM_meta_notify_demote_uname}"
rc=$?
@@ -1437,6 +1648,9 @@ action_notify() {
# On ohter nodes processing the post-demote, make sure the demoted node will be forgotten
unjoin_nodes_from_cluster "${OCF_RESKEY_CRM_meta_notify_demote_uname}"
else
+ # Wait for synced state first
+ ocf_log info "${LH} waiting $((OCF_RESKEY_stop_time/2)) to sync"
+ wait_sync $((OCF_RESKEY_stop_time/2))
# On the nodes being demoted, reset the master score
ocf_log info "${LH} resetting the master score."
master_score 0
@@ -1577,6 +1791,11 @@ action_demote() {
"$OCF_RUNNING_MASTER")
# Running as master. Normal, expected behavior.
ocf_log warn "${LH} Resource is currently running as Master"
+
+ # Wait for synced state first
+ ocf_log info "${LH} waiting $((OCF_RESKEY_stop_time/2)) to sync"
+ wait_sync $((OCF_RESKEY_stop_time/2))
+
stop_rmq_server_app
rc=$?
crm_attribute -N $THIS_PCMK_NODE -l reboot --name 'rabbit-master' --delete
diff --git a/src/rabbit.erl b/src/rabbit.erl
index d11b8d95a5..7c85685276 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -459,7 +459,8 @@ status() ->
{uptime, begin
{T,_} = erlang:statistics(wall_clock),
T div 1000
- end}],
+ end},
+ {kernel, {net_ticktime, net_kernel:get_net_ticktime()}}],
S1 ++ S2 ++ S3 ++ S4.
alarms() ->
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index b8997faea5..849efa3611 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -287,10 +287,10 @@ promote_slave([SPid | SPids]) ->
{SPid, SPids}.
initial_queue_node(Q, DefNode) ->
- {MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, all_nodes()),
+ {MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, rabbit_nodes:all_running()),
MNode.
-suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, all_nodes()).
+suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:all_running()).
suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All).
%% The third argument exists so we can pull a call to
@@ -312,8 +312,6 @@ suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, DefNode, All) ->
_ -> {MNode, []}
end.
-all_nodes() -> rabbit_mnesia:cluster_nodes(running).
-
policy(Policy, Q) ->
case rabbit_policy:get(Policy, Q) of
undefined -> none;
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 534ef1afad..62fc718f79 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -352,13 +352,46 @@ batch_publish(Batch, MA, BQ, BQS) ->
BQS1 = BQ:batch_publish(Batch, none, noflow, BQS),
{MA, BQS1}.
+%% TODO
+%%
+%% The case clause in this function assumes that we are either dealing
+%% with a backing_queue that returns acktags as integers, or a
+%% priority queue.
+%% A possible fix to this would be to add a function
+%% to the BQ API where we pass a list of messages and acktags and the
+%% BQ implementation knows how to zip them together.
batch_publish_delivered(Batch, MA, BQ, BQS) ->
{AckTags, BQS1} = BQ:batch_publish_delivered(Batch, none, noflow, BQS),
MA1 =
- lists:foldl(fun ({{Msg, _}, AckTag}, MAs) ->
- [{Msg#basic_message.id, AckTag} | MAs]
- end, MA, lists:zip(Batch, AckTags)),
+ case hd(AckTags) of
+ HeadTag when is_integer(HeadTag) ->
+ lists:foldl(fun ({{Msg, _}, AckTag}, MAs) ->
+ [{msg_id(Msg), AckTag} | MAs]
+ end, MA, lists:zip(Batch, AckTags));
+ _AckTags ->
+ %% priority queue processing of acktags
+ BatchByPriority = batch_by_priority(Batch),
+ lists:foldl(fun (Acks, MAs) ->
+ {P, _AckTag} = hd(Acks),
+ Pubs = orddict:fetch(P, BatchByPriority),
+ MAs0 = zip_msgs_and_tags(Pubs, Acks),
+ MAs ++ MAs0
+ end, MA, AckTags)
+ end,
{MA1, BQS1}.
+batch_by_priority(Batch) ->
+ rabbit_priority_queue:partition_publish_delivered_batch(Batch).
+
+zip_msgs_and_tags(Pubs, AckTags) ->
+ lists:zipwith(
+ fun (Pub, AckTag) ->
+ {Msg, _Props} = Pub,
+ {msg_id(Msg), AckTag}
+ end, Pubs, AckTags).
+
props(Props) ->
Props#message_properties{needs_confirming = false}.
+
+msg_id(#basic_message{ id = Id }) ->
+ Id.
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 090aacc63c..57d971715b 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -18,7 +18,8 @@
-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0,
is_running/2, is_process_running/2,
- cluster_name/0, set_cluster_name/1, ensure_epmd/0]).
+ cluster_name/0, set_cluster_name/1, ensure_epmd/0,
+ all_running/0]).
-include_lib("kernel/include/inet.hrl").
@@ -42,6 +43,7 @@
-spec(cluster_name/0 :: () -> binary()).
-spec(set_cluster_name/1 :: (binary()) -> 'ok').
-spec(ensure_epmd/0 :: () -> 'ok').
+-spec(all_running/0 :: () -> [node()]).
-endif.
@@ -215,3 +217,5 @@ port_shutdown_loop(Port) ->
{Port, {exit_status, _Rc}} -> ok;
{Port, _} -> port_shutdown_loop(Port)
end.
+
+all_running() -> rabbit_mnesia:cluster_nodes(running).
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index e3fdddf0ca..46a3991d88 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -42,6 +42,9 @@
handle_pre_hibernate/1, resume/1, msg_rates/1,
info/2, invoke/3, is_duplicate/2, set_queue_mode/2]).
+%% for rabbit_mirror_queue_sync.
+-export([partition_publish_delivered_batch/1]).
+
-record(state, {bq, bqss}).
-record(passthrough, {bq, bqs}).
@@ -205,8 +208,7 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow,
?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)).
batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ}) ->
- PubDict = publishes_by_priority(
- Publishes, fun ({Msg, _, _}) -> Msg end),
+ PubDict = partition_publish_batch(Publishes),
lists:foldl(
fun ({Priority, Pubs}, St) ->
pick1(fun (_P, BQSN) ->
@@ -228,8 +230,7 @@ publish_delivered(Msg, MsgProps, ChPid, Flow,
?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)).
batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ}) ->
- PubDict = publishes_by_priority(
- Publishes, fun ({Msg, _}) -> Msg end),
+ PubDict = partition_publish_delivered_batch(Publishes),
{PrioritiesAndAcks, State1} =
lists:foldl(
fun ({Priority, Pubs}, {PriosAndAcks, St}) ->
@@ -238,7 +239,7 @@ batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ}) ->
{AckTags, BQSN1} =
BQ:batch_publish_delivered(
Pubs, ChPid, Flow, BQSN),
- {{P, AckTags}, BQSN1}
+ {priority_on_acktags(P, AckTags), BQSN1}
end, Priority, St),
{[PriosAndAcks1 | PriosAndAcks], St1}
end, {[], State}, orddict:to_list(PubDict)),
@@ -571,7 +572,15 @@ a(State = #state{bqss = BQSs}) ->
end.
%%----------------------------------------------------------------------------
-publishes_by_priority(Publishes, ExtractMsg) ->
+partition_publish_batch(Publishes) ->
+ partition_publishes(
+ Publishes, fun ({Msg, _, _}) -> Msg end).
+
+partition_publish_delivered_batch(Publishes) ->
+ partition_publishes(
+ Publishes, fun ({Msg, _}) -> Msg end).
+
+partition_publishes(Publishes, ExtractMsg) ->
lists:foldl(fun (Pub, Dict) ->
Msg = ExtractMsg(Pub),
rabbit_misc:orddict_cons(priority2(Msg), Pub, Dict)
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
index 129f51d099..0c6f0820c7 100644
--- a/src/rabbit_queue_decorator.erl
+++ b/src/rabbit_queue_decorator.erl
@@ -42,7 +42,7 @@
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2},
+ [{startup, 1}, {shutdown, 1}, {policy_changed, 2},
{active_for, 1}, {consumer_state_changed, 3}];
behaviour_info(_Other) ->
undefined.