summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile2
-rw-r--r--scripts/rabbitmq-env.bat8
-rwxr-xr-xscripts/rabbitmq-server-ha.ocf84
-rw-r--r--src/rabbit_amqqueue_process.erl45
-rw-r--r--src/rabbit_error_logger.erl4
-rw-r--r--src/rabbit_lager.erl3
-rw-r--r--src/rabbit_log.erl12
7 files changed, 109 insertions, 49 deletions
diff --git a/Makefile b/Makefile
index 4bf2b7b650..dfb7555d79 100644
--- a/Makefile
+++ b/Makefile
@@ -6,8 +6,6 @@ PACKAGES_DIR ?= $(abspath PACKAGES)
DEPS = ranch lager $(PLUGINS)
-dep_lager = git https://github.com/rabbitmq/lager.git master
-
define usage_xml_to_erl
$(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, src/rabbit_%_usage.erl, $(subst -,_,$(1))))
endef
diff --git a/scripts/rabbitmq-env.bat b/scripts/rabbitmq-env.bat
index 23477ba75a..ee89ac5605 100644
--- a/scripts/rabbitmq-env.bat
+++ b/scripts/rabbitmq-env.bat
@@ -234,10 +234,10 @@ if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
set RABBITMQ_PLUGINS_EXPAND_DIR=!PLUGINS_EXPAND_DIR!
)
)
-if not exist "!RABBITMQ_PLUGINS_EXPAND_DIR!" (
- mkdir "!RABBITMQ_PLUGINS_EXPAND_DIR!"
-)
-for /f "delims=" %%F in ("!RABBITMQ_PLUGINS_EXPAND_DIR!") do set RABBITMQ_PLUGINS_EXPAND_DIR=%%~sF
+REM FIXME: RabbitMQ removes and recreates RABBITMQ_PLUGINS_EXPAND_DIR
+REM itself. Therefore we can't create it here in advance and escape the
+REM directory name, and RABBITMQ_PLUGINS_EXPAND_DIR must not contain
+REM non-US-ASCII characters.
REM [ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
diff --git a/scripts/rabbitmq-server-ha.ocf b/scripts/rabbitmq-server-ha.ocf
index d1088bc42d..5505c10581 100755
--- a/scripts/rabbitmq-server-ha.ocf
+++ b/scripts/rabbitmq-server-ha.ocf
@@ -407,8 +407,8 @@ proc_kill()
# OCF_SUCCESS
# LL
# Arguments:
-# $1 - pidfile or pid
-# $2 - service name used for logging
+# $1 - pidfile or pid or 'none', if stopping by the name matching
+# $2 - service name used for logging or for the failback stopping method
# $3 - stop process timeout (in sec), used to determine how many times we try
# SIGTERM and an upper limit on how long this function should try and
# stop the process. Defaults to 15.
@@ -425,16 +425,20 @@ proc_stop()
local i
local pid
local pidfile
- # check if provide just a number
- echo "${pid_param}" | egrep -q '^[0-9]+$'
- if [ $? -eq 0 ]; then
- pid="${pid_param}"
- elif [ -e "${pid_param}" ]; then # check if passed in a pid file
- pidfile="${pid_param}"
- pid=$(cat "${pidfile}" 2>/dev/null | tr -s " " "\n" | sort -u)
- else
- ocf_log warn "${LH} pid param ${pid_param} is not a file or a number, try match by ${service_name}"
+ if [ "${pid_param}" = "none" ] ; then
pid="none"
+ else
+ # check if provide just a number
+ echo "${pid_param}" | egrep -q '^[0-9]+$'
+ if [ $? -eq 0 ]; then
+ pid="${pid_param}"
+ elif [ -e "${pid_param}" ]; then # check if passed in a pid file
+ pidfile="${pid_param}"
+ pid=$(cat "${pidfile}" 2>/dev/null | tr -s " " "\n" | sort -u)
+ else
+ ocf_log warn "${LH} pid param ${pid_param} is not a file or a number, try match by ${service_name}"
+ pid="none"
+ fi
fi
# number of times to try a SIGTEM is (timeout - 5 seconds) / 2 seconds
local stop_count=$(( ($timeout-5)/2 ))
@@ -790,10 +794,14 @@ update_cookie() {
return $OCF_SUCCESS
}
-# Stop rmq beam process by pid or rabbit node name match. Returns SUCCESS/ERROR
+# Stop rmq beam process by pid and by rabbit node name match. Returns SUCCESS/ERROR
kill_rmq_and_remove_pid() {
local LH="${LL} kill_rmq_and_remove_pid():"
+ # Stop the rabbitmq-server by its pidfile, use the name matching as a fallback,
+ # and ignore the exit code
proc_stop "${OCF_RESKEY_pid_file}" "beam.*${RABBITMQ_NODENAME}" "${OCF_RESKEY_stop_time}"
+ # Ensure the beam.smp stopped by the rabbit node name matching as well
+ proc_stop none "beam.*${RABBITMQ_NODENAME}" "${OCF_RESKEY_stop_time}"
if [ $? -eq 0 ] ; then
return $OCF_SUCCESS
else
@@ -967,9 +975,11 @@ stop_server_process() {
[ $? -eq 0 ] && ocf_log info "${LH} RMQ-server process (PID=${pid}) stopped succesfully."
fi
- if [ -f ${OCF_RESKEY_pid_file} ] ; then
- # Ensure there is no beam process and pidfile left
- ocf_log warn "${LH} The pidfile still exists, forcing the RMQ-server cleanup"
+ # Ensure there is no beam process and pidfile left
+ pgrep -f "beam.*${RABBITMQ_NODENAME}" > /dev/null
+ rc=$?
+ if [ -f ${OCF_RESKEY_pid_file} -o $rc -eq 0 ] ; then
+ ocf_log warn "${LH} The pidfile or beam's still exist, forcing the RMQ-server cleanup"
kill_rmq_and_remove_pid
fi
@@ -1399,34 +1409,32 @@ get_monitor() {
if [ $rabbit_running -eq $OCF_SUCCESS ]
then
ocf_log info "${LH} rabbit app is running. checking if we are the part of healthy cluster"
- rc_check=$OCF_ERR_GENERIC
- nodelist=$(get_alive_pacemaker_nodes_but)
- for node in $nodelist
- do
- status_master=1
- # Do not refetch the master status for *this* node as we know it already
- if [ $rc -ne $OCF_RUNNING_MASTER ] ; then
+
+ if [ $rc -eq $OCF_RUNNING_MASTER ] ; then
+ # The master is always running inside of its cluster
+ ocf_log info "${LH} rabbit app is running and is master of cluster"
+ rc_check=$OCF_SUCCESS
+ else
+ rc_check=$OCF_ERR_GENERIC
+ nodelist=$(get_alive_pacemaker_nodes_but)
+ for node in $nodelist
+ do
ocf_log info "${LH} rabbit app is running. looking for master on $node"
is_master $node
status_master=$?
ocf_log info "${LH} fetched master attribute for $node. attr value is ${status_master}"
- else
- # The master is always running inside of its cluster
- ocf_log info "${LH} rabbit app is running and is member of healthy cluster"
- rc_check=$OCF_SUCCESS
- break
- fi
- if [ $status_master -eq 0 ] ; then
- ocf_log info "${LH} rabbit app is running. master is $node"
- if get_running_nodes | grep -q $(rabbit_node_name $node)
- then
- ocf_log info "${LH} rabbit app is running and is member of healthy cluster"
- rc_check=$OCF_SUCCESS
- break
+ if [ $status_master -eq 0 ] ; then
+ ocf_log info "${LH} rabbit app is running. master is $node"
+ if get_running_nodes | grep -q $(rabbit_node_name $node)
+ then
+ ocf_log info "${LH} rabbit app is running and is member of healthy cluster"
+ rc_check=$OCF_SUCCESS
+ break
+ fi
fi
- fi
- done
- [ $rc_check -eq $OCF_ERR_GENERIC ] && ocf_log err "${LH} rabbit node is running out of the cluster"
+ done
+ [ $rc_check -eq $OCF_ERR_GENERIC ] && ocf_log err "${LH} rabbit node is running out of the cluster"
+ fi
else
if [ "$OCF_CHECK_LEVEL" -gt 20 ]; then
ocf_log info "${LH} rabbit app is not running. checking if there is a master"
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1a86851d0a..f79304632e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -142,6 +142,7 @@ init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
{_, Terms} = recovery_status(Recover),
BQS = bq_init(BQ, Q, Terms),
%% Rely on terminate to delete the queue.
+ log_delete_exclusive(Owner, State),
{stop, {shutdown, missing_owner},
State#q{backing_queue = BQ, backing_queue_state = BQS}}
end.
@@ -701,7 +702,13 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
exclusive_consumer = Holder1},
notify_decorators(State2),
case should_auto_delete(State2) of
- true -> {stop, State2};
+ true ->
+ log_auto_delete(
+ io_lib:format(
+ "because all of its consumers (~p) were on a channel that was closed",
+ [length(ChCTags)]),
+ State),
+ {stop, State2};
false -> {ok, requeue_and_run(ChAckTags,
ensure_expiry_timer(State2))}
end
@@ -939,6 +946,7 @@ prioritise_call(Msg, _From, _Len, State) ->
prioritise_cast(Msg, _Len, State) ->
case Msg of
delete_immediately -> 8;
+ {delete_exclusive, _Pid} -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
@@ -1063,7 +1071,13 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
notify_decorators(State1),
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
- true -> stop(ok, State1)
+ true ->
+ log_auto_delete(
+ io_lib:format(
+ "because its last consumer with tag '~s' was cancelled",
+ [ConsumerTag]),
+ State),
+ stop(ok, State1)
end
end;
@@ -1165,6 +1179,10 @@ handle_cast({reject, false, AckTags, ChPid}, State) ->
end) end,
fun () -> ack(AckTags, ChPid, State) end));
+handle_cast({delete_exclusive, ConnPid}, State) ->
+ log_delete_exclusive(ConnPid, State),
+ stop(State);
+
handle_cast(delete_immediately, State) ->
stop(State);
@@ -1284,6 +1302,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
%% match what people expect (see bug 21824). However we need this
%% monitor-and-async- delete in case the connection goes away
%% unexpectedly.
+ log_delete_exclusive(DownPid, State),
stop(State);
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
@@ -1347,3 +1366,25 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
{hibernate, stop_rate_timer(State1)}.
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
+log_delete_exclusive({ConPid, ConRef}, State) ->
+ log_delete_exclusive(ConPid, State);
+log_delete_exclusive(ConPid, #q{ q = #amqqueue{ name = Resource } }) ->
+ #resource{ name = QName, virtual_host = VHost } = Resource,
+ rabbit_queue:debug("Deleting exclusive queue '~s' in vhost '~s' " ++
+ " because its declaring connection ~p was closed",
+ [QName, VHost, ConPid]).
+
+log_auto_delete(Reason, #q{ q = #amqqueue{ name = Resource } }) ->
+ #resource{ name = QName, virtual_host = VHost } = Resource,
+ rabbit_queue:debug("Deleting auto-delete queue '~s' in vhost '~s' " ++
+ Reason,
+ [QName, VHost]).
+
+
+
+
+
+
+
+
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index e8b7ce5669..efe8495299 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -103,10 +103,12 @@ publish1(RoutingKey, Format, Data, LogExch) ->
Timestamp = time_compat:os_system_time(seconds),
Args = [truncate:term(A, ?LOG_TRUNC) || A <- Data],
+ Headers = [{<<"node">>, longstr, list_to_binary(atom_to_list(node()))}],
{ok, _DeliveredQPids} =
rabbit_basic:publish(LogExch, RoutingKey,
#'P_basic'{content_type = <<"text/plain">>,
- timestamp = Timestamp},
+ timestamp = Timestamp,
+ headers = Headers},
list_to_binary(io_lib:format(Format, Args))),
ok.
diff --git a/src/rabbit_lager.erl b/src/rabbit_lager.erl
index 6ae9c10a5e..8beee10846 100644
--- a/src/rabbit_lager.erl
+++ b/src/rabbit_lager.erl
@@ -210,7 +210,8 @@ configure_lager() ->
%% messages to the default sink. To know the list of expected extra
%% sinks, we look at the 'lager_extra_sinks' compilation option.
Sinks0 = application:get_env(lager, extra_sinks, []),
- Sinks1 = configure_extra_sinks(Sinks0, list_expected_sinks()),
+ Sinks1 = configure_extra_sinks(Sinks0,
+ [error_logger | list_expected_sinks()]),
%% TODO Waiting for basho/lager#303
%% Sinks2 = lists:keystore(error_logger_lager_event, 1, Sinks1,
%% {error_logger_lager_event,
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 5eedd925fe..a22dcbb6f0 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -74,10 +74,20 @@ log(Category, Level, Fmt) -> log(Category, Level, Fmt, []).
log(Category, Level, Fmt, Args) when is_list(Args) ->
Sink = case Category of
default -> ?LAGER_SINK;
- _ -> lager_util:make_internal_sink_name(Category)
+ _ -> make_internal_sink_name(Category)
end,
lager:log(Sink, Level, self(), Fmt, Args).
+make_internal_sink_name(Category) when Category == channel;
+ Category == connection;
+ Category == mirroring;
+ Category == queue;
+ Category == federation ->
+ lager_util:make_internal_sink_name(list_to_atom("rabbit_" ++
+ atom_to_list(Category)));
+make_internal_sink_name(Category) ->
+ lager_util:make_internal_sink_name(Category).
+
debug(Format) -> debug(Format, []).
debug(Format, Args) -> debug(self(), Format, Args).
debug(Metadata, Format, Args) ->