summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-05-15 16:24:27 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-05-15 16:24:27 +0100
commit01c649e6ef942271d12f6a7620ca2fdbb8c8464f (patch)
treeec3b37880f79ecd1e517ab7e6b5e84b40c9c4cb8
parent48ed9f2a5bbc39ef752c7d698416b1d368c7322e (diff)
parentd0652f278fc17ed260738b51d6f49a0598d0dd2f (diff)
downloadrabbitmq-server-git-01c649e6ef942271d12f6a7620ca2fdbb8c8464f.tar.gz
Merge default
-rw-r--r--Makefile2
-rw-r--r--packaging/RPMS/Fedora/Makefile2
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.init (renamed from packaging/common/rabbitmq-server.init)4
-rw-r--r--packaging/debs/Debian/Makefile8
-rw-r--r--packaging/debs/Debian/debian/rabbitmq-server.default9
-rw-r--r--packaging/debs/Debian/debian/rabbitmq-server.init185
-rw-r--r--packaging/debs/Debian/debian/rules1
-rw-r--r--src/mirrored_supervisor.erl45
-rw-r--r--src/mirrored_supervisor_tests.erl18
-rw-r--r--src/rabbit.erl22
-rw-r--r--src/rabbit_amqqueue_process.erl96
-rw-r--r--src/rabbit_backing_queue.erl17
-rw-r--r--src/rabbit_backing_queue_qc.erl7
-rw-r--r--src/rabbit_basic.erl5
-rw-r--r--src/rabbit_channel.erl41
-rw-r--r--src/rabbit_control.erl46
-rw-r--r--src/rabbit_disk_monitor.erl6
-rw-r--r--src/rabbit_event.erl6
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl2
-rw-r--r--src/rabbit_mirror_queue_master.erl21
-rw-r--r--src/rabbit_mirror_queue_slave.erl35
-rw-r--r--src/rabbit_net.erl20
-rw-r--r--src/rabbit_nodes.erl9
-rw-r--r--src/rabbit_plugins.erl18
-rw-r--r--src/rabbit_reader.erl8
-rw-r--r--src/rabbit_runtime_parameters.erl4
-rw-r--r--src/rabbit_tests.erl16
-rw-r--r--src/rabbit_variable_queue.erl45
-rw-r--r--src/supervisor2.erl4
30 files changed, 476 insertions, 228 deletions
diff --git a/Makefile b/Makefile
index db7462a616..49bf926afc 100644
--- a/Makefile
+++ b/Makefile
@@ -207,7 +207,7 @@ start-background-node: all
-rm -f $(RABBITMQ_MNESIA_DIR).pid
mkdir -p $(RABBITMQ_MNESIA_DIR)
setsid sh -c "$(MAKE) run-background-node > $(RABBITMQ_MNESIA_DIR)/startup_log 2> $(RABBITMQ_MNESIA_DIR)/startup_err" &
- sleep 1
+ ./scripts/rabbitmqctl -n $(RABBITMQ_NODENAME) wait $(RABBITMQ_MNESIA_DIR).pid kernel
start-rabbit-on-node: all
echo "rabbit:start()." | $(ERL_CALL)
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index 234fc2c7d7..180500ed38 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -32,8 +32,8 @@ prepare:
SPECS/rabbitmq-server.spec
cp ${COMMON_DIR}/* SOURCES/
+ cp rabbitmq-server.init SOURCES/rabbitmq-server.init
sed -i \
- -e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \
-e 's|^START_PROG=.*$$|START_PROG="$(START_PROG)"|' \
SOURCES/rabbitmq-server.init
ifeq "$(RPM_OS)" "fedora"
diff --git a/packaging/common/rabbitmq-server.init b/packaging/RPMS/Fedora/rabbitmq-server.init
index c942f8e314..2d2680e3b2 100644
--- a/packaging/common/rabbitmq-server.init
+++ b/packaging/RPMS/Fedora/rabbitmq-server.init
@@ -27,7 +27,7 @@ INIT_LOG_DIR=/var/log/rabbitmq
PID_FILE=/var/run/rabbitmq/pid
START_PROG= # Set when building package
-LOCK_FILE= # Set when building package
+LOCK_FILE=/var/lock/subsys/$NAME
test -x $DAEMON || exit 0
test -x $CONTROL || exit 0
@@ -35,6 +35,8 @@ test -x $CONTROL || exit 0
RETVAL=0
set -e
+[ -f /etc/default/${NAME} ] && . /etc/default/${NAME}
+
ensure_pid_dir () {
PID_DIR=`dirname ${PID_FILE}`
if [ ! -d ${PID_DIR} ] ; then
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index 2a738f6e7a..844388c6f4 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -22,14 +22,6 @@ package: clean
tar -zxf $(DEBIAN_ORIG_TARBALL)
cp -r debian $(UNPACKED_DIR)
cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/
-# Debian and descendants differ from most other distros in that
-# runlevel 2 should start network services.
- sed -i \
- -e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \
- -e 's|^START_PROG=.*$$|START_PROG="start-stop-daemon -v --chuid rabbitmq --start --exec"|' \
- -e 's|^\(# Default-Start:\).*$$|\1 2 3 4 5|' \
- -e 's|^\(# Default-Stop:\).*$$|\1 0 1 6|' \
- $(UNPACKED_DIR)/debian/rabbitmq-server.init
sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
$(UNPACKED_DIR)/debian/rabbitmq-script-wrapper
chmod a+x $(UNPACKED_DIR)/debian/rules
diff --git a/packaging/debs/Debian/debian/rabbitmq-server.default b/packaging/debs/Debian/debian/rabbitmq-server.default
new file mode 100644
index 0000000000..bde5e30895
--- /dev/null
+++ b/packaging/debs/Debian/debian/rabbitmq-server.default
@@ -0,0 +1,9 @@
+# This file is sourced by /etc/init.d/rabbitmq-server. Its primary
+# reason for existing is to allow adjustment of system limits for the
+# rabbitmq-server process.
+#
+# Maximum number of open file handles. This will need to be increased
+# to handle many simultaneous connections. Refer to the system
+# documentation for ulimit (in man bash) for more information.
+#
+#ulimit -n 1024
diff --git a/packaging/debs/Debian/debian/rabbitmq-server.init b/packaging/debs/Debian/debian/rabbitmq-server.init
new file mode 100644
index 0000000000..f514b9744b
--- /dev/null
+++ b/packaging/debs/Debian/debian/rabbitmq-server.init
@@ -0,0 +1,185 @@
+#!/bin/sh
+#
+# rabbitmq-server RabbitMQ broker
+#
+# chkconfig: - 80 05
+# description: Enable AMQP service provided by RabbitMQ
+#
+
+### BEGIN INIT INFO
+# Provides: rabbitmq-server
+# Required-Start: $remote_fs $network
+# Required-Stop: $remote_fs $network
+# Default-Start: 2 3 4 5
+# Default-Stop: 0 1 6
+# Description: RabbitMQ broker
+# Short-Description: Enable AMQP service provided by RabbitMQ broker
+### END INIT INFO
+
+PATH=/sbin:/usr/sbin:/bin:/usr/bin
+NAME=rabbitmq-server
+DAEMON=/usr/sbin/${NAME}
+CONTROL=/usr/sbin/rabbitmqctl
+DESC="message broker"
+USER=rabbitmq
+ROTATE_SUFFIX=
+INIT_LOG_DIR=/var/log/rabbitmq
+PID_FILE=/var/run/rabbitmq/pid
+
+
+test -x $DAEMON || exit 0
+test -x $CONTROL || exit 0
+
+RETVAL=0
+set -e
+
+[ -f /etc/default/${NAME} ] && . /etc/default/${NAME}
+
+. /lib/lsb/init-functions
+. /lib/init/vars.sh
+
+ensure_pid_dir () {
+ PID_DIR=`dirname ${PID_FILE}`
+ if [ ! -d ${PID_DIR} ] ; then
+ mkdir -p ${PID_DIR}
+ chown -R ${USER}:${USER} ${PID_DIR}
+ chmod 755 ${PID_DIR}
+ fi
+}
+
+remove_pid () {
+ rm -f ${PID_FILE}
+ rmdir `dirname ${PID_FILE}` || :
+}
+
+start_rabbitmq () {
+ status_rabbitmq quiet
+ if [ $RETVAL != 0 ] ; then
+ RETVAL=0
+ ensure_pid_dir
+ set +e
+ RABBITMQ_PID_FILE=$PID_FILE start-stop-daemon --quiet \
+ --chuid rabbitmq --start --exec $DAEMON \
+ --pidfile "$RABBITMQ_PID_FILE" \
+ > "${INIT_LOG_DIR}/startup_log" \
+ 2> "${INIT_LOG_DIR}/startup_err" \
+ 0<&- &
+ $CONTROL wait $PID_FILE >/dev/null 2>&1
+ RETVAL=$?
+ set -e
+ if [ $RETVAL != 0 ] ; then
+ remove_pid
+ fi
+ else
+ RETVAL=3
+ fi
+}
+
+stop_rabbitmq () {
+ status_rabbitmq quiet
+ if [ $RETVAL = 0 ] ; then
+ set +e
+ $CONTROL stop ${PID_FILE} > ${INIT_LOG_DIR}/shutdown_log 2> ${INIT_LOG_DIR}/shutdown_err
+ RETVAL=$?
+ set -e
+ if [ $RETVAL = 0 ] ; then
+ remove_pid
+ fi
+ else
+ RETVAL=3
+ fi
+}
+
+status_rabbitmq() {
+ set +e
+ if [ "$1" != "quiet" ] ; then
+ $CONTROL status 2>&1
+ else
+ $CONTROL status > /dev/null 2>&1
+ fi
+ if [ $? != 0 ] ; then
+ RETVAL=3
+ fi
+ set -e
+}
+
+rotate_logs_rabbitmq() {
+ set +e
+ $CONTROL -q rotate_logs ${ROTATE_SUFFIX}
+ if [ $? != 0 ] ; then
+ RETVAL=1
+ fi
+ set -e
+}
+
+restart_running_rabbitmq () {
+ status_rabbitmq quiet
+ if [ $RETVAL = 0 ] ; then
+ restart_rabbitmq
+ else
+ log_warning_msg "${DESC} not running"
+ fi
+}
+
+restart_rabbitmq() {
+ stop_rabbitmq
+ start_rabbitmq
+}
+
+restart_end() {
+ if [ $RETVAL = 0 ] ; then
+ log_end_msg 0
+ else
+ log_end_msg 1
+ fi
+}
+
+start_stop_end() {
+ case "$RETVAL" in
+ 0)
+ log_end_msg 0;;
+ 3)
+ log_warning_msg "${DESC} already ${1}"
+ log_end_msg 0;;
+ *)
+ log_warning_msg "FAILED - check ${INIT_LOG_DIR}/startup_\{log, _err\}"
+ log_end_msg 1;;
+ esac
+}
+
+case "$1" in
+ start)
+ log_daemon_msg "Starting ${DESC}" $NAME
+ start_rabbitmq
+ start_stop_end "started"
+ ;;
+ stop)
+ log_daemon_msg "Stopping ${DESC}" $NAME
+ stop_rabbitmq
+ start_stop_end "stopped"
+ ;;
+ status)
+ status_rabbitmq
+ ;;
+ rotate-logs)
+ log_action_begin_msg "Rotating log files for ${DESC} ${NAME}"
+ rotate_logs_rabbitmq
+ log_action_end_msg $RETVAL
+ ;;
+ force-reload|reload|restart)
+ log_daemon_msg "Restarting ${DESC}" $NAME
+ restart_rabbitmq
+ restart_end
+ ;;
+ try-restart)
+ log_daemon_msg "Restarting ${DESC}" $NAME
+ restart_running_rabbitmq
+ restart_end
+ ;;
+ *)
+ echo "Usage: $0 {start|stop|status|rotate-logs|restart|condrestart|try-restart|reload|force-reload}" >&2
+ RETVAL=1
+ ;;
+esac
+
+exit $RETVAL
diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules
index 108b1ed512..ecb778df96 100644
--- a/packaging/debs/Debian/debian/rules
+++ b/packaging/debs/Debian/debian/rules
@@ -19,3 +19,4 @@ install/rabbitmq-server::
done
sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm
install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server
+ install -p -D -m 0644 debian/rabbitmq-server.default $(DEB_DESTDIR)etc/default/rabbitmq-server
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 221f6a8727..4fe9398108 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -261,24 +261,19 @@ start_internal(Group, ChildSpecs) ->
%%----------------------------------------------------------------------------
-init({overall, Group, Init}) ->
- case Init of
- {ok, {Restart, ChildSpecs}} ->
- Delegate = {delegate, {?SUPERVISOR, start_link,
- [?MODULE, {delegate, Restart}]},
- temporary, 16#ffffffff, supervisor, [?SUPERVISOR]},
- Mirroring = {mirroring, {?MODULE, start_internal,
- [Group, ChildSpecs]},
- permanent, 16#ffffffff, worker, [?MODULE]},
- %% Important: Delegate MUST start before Mirroring so that
- %% when we shut down from above it shuts down last, so
- %% Mirroring does not see it die.
- %%
- %% See comment in handle_info('DOWN', ...) below
- {ok, {{one_for_all, 0, 1}, [Delegate, Mirroring]}};
- ignore ->
- ignore
- end;
+init({overall, _Group, ignore}) -> ignore;
+init({overall, Group, {ok, {Restart, ChildSpecs}}}) ->
+ %% Important: Delegate MUST start before Mirroring so that when we
+ %% shut down from above it shuts down last, so Mirroring does not
+ %% see it die.
+ %%
+ %% See comment in handle_info('DOWN', ...) below
+ {ok, {{one_for_all, 0, 1},
+ [{delegate, {?SUPERVISOR, start_link, [?MODULE, {delegate, Restart}]},
+ temporary, 16#ffffffff, supervisor, [?SUPERVISOR]},
+ {mirroring, {?MODULE, start_internal, [Group, ChildSpecs]},
+ permanent, 16#ffffffff, worker, [?MODULE]}]}};
+
init({delegate, Restart}) ->
{ok, {Restart, []}};
@@ -306,9 +301,9 @@ handle_call({init, Overall}, _From,
Delegate = child(Overall, delegate),
erlang:monitor(process, Delegate),
State1 = State#state{overall = Overall, delegate = Delegate},
- case all_started([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of
- true -> {reply, ok, State1};
- false -> {stop, shutdown, State1}
+ case errors([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of
+ [] -> {reply, ok, State1};
+ Errors -> {stop, {shutdown, Errors}, State1}
end;
handle_call({start_child, ChildSpec}, _From,
@@ -372,9 +367,9 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason},
[start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
_ -> []
end,
- case all_started(R) of
- true -> {noreply, State};
- false -> {stop, shutdown, State}
+ case errors(R) of
+ [] -> {noreply, State};
+ Errors -> {stop, {shutdown, Errors}, State}
end;
handle_info(Info, State) ->
@@ -468,7 +463,7 @@ delete_all(Group) ->
[delete(Group, id(C)) ||
C <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])].
-all_started(Results) -> [] =:= [R || R = {error, _} <- Results].
+errors(Results) -> [E || {error, E} <- Results].
%%----------------------------------------------------------------------------
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index e8baabe8ff..60192b34d7 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -51,7 +51,7 @@ test_migrate() ->
with_sups(fun([A, _]) ->
?MS:start_child(a, childspec(worker)),
Pid1 = pid_of(worker),
- kill(A, Pid1),
+ kill_registered(A, Pid1),
Pid2 = pid_of(worker),
false = (Pid1 =:= Pid2)
end, [a, b]).
@@ -61,10 +61,10 @@ test_migrate_twice() ->
with_sups(fun([A, B]) ->
?MS:start_child(a, childspec(worker)),
Pid1 = pid_of(worker),
- kill(A, Pid1),
+ kill_registered(A, Pid1),
{ok, C} = start_sup(c),
Pid2 = pid_of(worker),
- kill(B, Pid2),
+ kill_registered(B, Pid2),
Pid3 = pid_of(worker),
false = (Pid1 =:= Pid3),
kill(C)
@@ -124,7 +124,7 @@ test_large_group() ->
with_sups(fun([A, _, _, _]) ->
?MS:start_child(a, childspec(worker)),
Pid1 = pid_of(worker),
- kill(A, Pid1),
+ kill_registered(A, Pid1),
Pid2 = pid_of(worker),
false = (Pid1 =:= Pid2)
end, [a, b, c, d]).
@@ -134,7 +134,7 @@ test_childspecs_at_init() ->
S = childspec(worker),
with_sups(fun([A, _]) ->
Pid1 = pid_of(worker),
- kill(A, Pid1),
+ kill_registered(A, Pid1),
Pid2 = pid_of(worker),
false = (Pid1 =:= Pid2)
end, [{a, [S]}, {b, [S]}]).
@@ -143,7 +143,7 @@ test_anonymous_supervisors() ->
with_sups(fun([A, _B]) ->
?MS:start_child(A, childspec(worker)),
Pid1 = pid_of(worker),
- kill(A, Pid1),
+ kill_registered(A, Pid1),
Pid2 = pid_of(worker),
false = (Pid1 =:= Pid2)
end, [anon, anon]).
@@ -289,6 +289,12 @@ kill(Pid, Waits) ->
kill_wait(Pid),
[kill_wait(P) || P <- Waits].
+kill_registered(Pid, Child) ->
+ {registered_name, Name} = erlang:process_info(Child, registered_name),
+ kill(Pid, Child),
+ false = (Child =:= whereis(Name)),
+ ok.
+
kill_wait(Pid) ->
receive
{'DOWN', _Ref, process, Pid, _Reason} ->
diff --git a/src/rabbit.erl b/src/rabbit.erl
index eec7e34e8e..ea9731b68b 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -197,7 +197,7 @@
rabbit_queue_index, gen, dict, ordsets, file_handle_cache,
rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file,
rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
- mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow]).
+ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon]).
%% HiPE compilation uses multiple cores anyway, but some bits are
%% IO-bound so we can go faster if we parallelise a bit more. In
@@ -347,10 +347,7 @@ status() ->
is_running() -> is_running(node()).
is_running(Node) ->
- case rpc:call(Node, application, which_applications, [infinity]) of
- {badrpc, _} -> false;
- Apps -> proplists:is_defined(rabbit, Apps)
- end.
+ rabbit_nodes:is_running(Node, rabbit).
environment() ->
lists:keysort(
@@ -624,15 +621,12 @@ log_location(Type) ->
rotate_logs(File, Suffix, Handler) ->
rotate_logs(File, Suffix, Handler, Handler).
-rotate_logs(File, Suffix, OldHandler, NewHandler) ->
- case File of
- undefined -> ok;
- tty -> ok;
- _ -> gen_event:swap_handler(
- error_logger,
- {OldHandler, swap},
- {NewHandler, {File, Suffix}})
- end.
+rotate_logs(undefined, _Suffix, _OldHandler, _NewHandler) -> ok;
+rotate_logs(tty, _Suffix, _OldHandler, _NewHandler) -> ok;
+rotate_logs(File, Suffix, OldHandler, NewHandler) ->
+ gen_event:swap_handler(error_logger,
+ {OldHandler, swap},
+ {NewHandler, {File, Suffix}}).
log_rotation_result({error, MainLogError}, {error, SaslLogError}) ->
{error, {{cannot_rotate_main_logs, MainLogError},
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2063e557f4..5701efeb99 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -696,12 +696,18 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) ->
State;
drop_expired_messages(State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+ backing_queue = BQ }) ->
Now = now_micros(),
- BQS1 = BQ:dropwhile(
- fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
- dead_letter_fun(expired, State),
- BQS),
+ DLXFun = dead_letter_fun(expired, State),
+ ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
+ case DLXFun of
+ undefined -> {undefined, BQS1} = BQ:dropwhile(ExpirePred, false, BQS),
+ BQS1;
+ _ -> {Msgs, BQS1} = BQ:dropwhile(ExpirePred, true, BQS),
+ lists:foreach(
+ fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs),
+ BQS1
+ end,
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
ensure_ttl_timer(State = #q{backing_queue = BQ,
@@ -717,6 +723,14 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
+ack_if_no_dlx(AckTags, State = #q{dlx = undefined,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State#q{backing_queue_state = BQS1};
+ack_if_no_dlx(_AckTags, State) ->
+ State.
+
dead_letter_fun(_Reason, #q{dlx = undefined}) ->
undefined;
dead_letter_fun(Reason, _State) ->
@@ -724,31 +738,24 @@ dead_letter_fun(Reason, _State) ->
gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason})
end.
-dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) ->
- case rabbit_exchange:lookup(DLX) of
- {error, not_found} -> noreply(State);
- _ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
- State)
+dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) ->
+ DLMsg = #basic_message{exchange_name = XName} =
+ make_dead_letter_msg(Reason, Msg, State),
+ case rabbit_exchange:lookup(XName) of
+ {ok, X} ->
+ Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo),
+ {Queues, Cycles} = detect_dead_letter_cycles(
+ DLMsg, rabbit_exchange:route(X, Delivery)),
+ lists:foreach(fun log_cycle_once/1, Cycles),
+ QPids = rabbit_amqqueue:lookup(Queues),
+ {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
+ DeliveredQPids;
+ {error, not_found} ->
+ []
end.
-dead_letter_publish(Msg, Reason,
- State = #q{publish_seqno = MsgSeqNo,
- dlx = DLX}) ->
- Delivery = #delivery{message = #basic_message{exchange_name = XName}} =
- rabbit_basic:delivery(
- false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
- MsgSeqNo),
- {ok, X} = rabbit_exchange:lookup(XName),
- Queues = rabbit_exchange:route(X, Delivery),
- {Queues1, Cycles} = detect_dead_letter_cycles(Delivery, Queues),
- lists:foreach(fun log_cycle_once/1, Cycles),
- QPids = rabbit_amqqueue:lookup(Queues1),
- {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
- DeliveredQPids.
-
-dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
- State = #q{publish_seqno = MsgSeqNo,
- unconfirmed = UC}) ->
+dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo,
+ unconfirmed = UC}) ->
QPids = dead_letter_publish(Msg, Reason, State),
State1 = State#q{queue_monitors = pmon:monitor_all(
QPids, State#q.queue_monitors),
@@ -807,8 +814,7 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
false -> noreply(State1)
end.
-detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}},
- Queues) ->
+detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
NoCycles = {Queues, []},
@@ -835,31 +841,31 @@ detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}}
end
end.
-make_dead_letter_msg(DLX, Reason,
+make_dead_letter_msg(Reason,
Msg = #basic_message{content = Content,
exchange_name = Exchange,
routing_keys = RoutingKeys},
- State = #q{dlx_routing_key = DlxRoutingKey}) ->
+ State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) ->
{DeathRoutingKeys, HeadersFun1} =
case DlxRoutingKey of
undefined -> {RoutingKeys, fun (H) -> H end};
_ -> {[DlxRoutingKey],
fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
end,
+ ReasonBin = list_to_binary(atom_to_list(Reason)),
#resource{name = QName} = qname(State),
+ TimeSec = rabbit_misc:now_ms() div 1000,
HeadersFun2 =
fun (Headers) ->
%% The first routing key is the one specified in the
%% basic.publish; all others are CC or BCC keys.
- RoutingKeys1 =
- [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
- Info = [{<<"reason">>,
- longstr, list_to_binary(atom_to_list(Reason))},
- {<<"queue">>, longstr, QName},
- {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000},
- {<<"exchange">>, longstr, Exchange#resource.name},
- {<<"routing-keys">>, array,
- [{longstr, Key} || Key <- RoutingKeys1]}],
+ RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
+ RKs1 = [{longstr, Key} || Key <- RKs],
+ Info = [{<<"reason">>, longstr, ReasonBin},
+ {<<"queue">>, longstr, QName},
+ {<<"time">>, timestamp, TimeSec},
+ {<<"exchange">>, longstr, Exchange#resource.name},
+ {<<"routing-keys">>, array, RKs1}],
HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>,
Info, Headers))
end,
@@ -1234,11 +1240,13 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
ChPid, AckTags, State,
case Requeue of
true -> fun (State1) -> requeue_and_run(AckTags, State1) end;
- false -> Fun = dead_letter_fun(rejected, State),
- fun (State1 = #q{backing_queue = BQ,
+ false -> fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
+ Fun = dead_letter_fun(rejected, State1),
BQS1 = BQ:fold(Fun, BQS, AckTags),
- State1#q{backing_queue_state = BQS1}
+ ack_if_no_dlx(
+ AckTags,
+ State1#q{backing_queue_state = BQS1})
end
end));
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 6cc1c3fd3e..dc144a0e53 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -26,15 +26,14 @@
('empty' |
%% Message, IsDelivered, AckTag, Remaining_Len
{rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
--type(is_durable() :: boolean()).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
--type(confirm_required() :: boolean()).
-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
-type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
'undefined').
+-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
%% Called on startup with a list of durable queue names. The queues
%% aren't being started at this point, but this call allows the
@@ -117,12 +116,14 @@
%% be ignored.
-callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}.
-%% Drop messages from the head of the queue while the supplied
-%% predicate returns true. A callback function is supplied allowing
-%% callers access to messages that are about to be dropped.
--callback dropwhile(fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(),
- state())
- -> state().
+%% Drop messages from the head of the queue while the supplied predicate returns
+%% true. Also accepts a boolean parameter that determines whether the messages
+%% necessitate an ack or not. If they do, the function returns a list of
+%% messages with the respective acktags.
+-callback dropwhile(msg_pred(), true, state())
+ -> {[{rabbit_types:basic_message(), ack()}], state()};
+ (msg_pred(), false, state())
+ -> {undefined, state()}.
%% Produce the next message.
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 286b69e4ac..a84800c0ec 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -141,7 +141,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) ->
{call, ?BQMOD, drain_confirmed, [BQ]}.
qc_dropwhile(#state{bqstate = BQ}) ->
- {call, ?BQMOD, dropwhile, [fun dropfun/1, fun (_,_) -> ok end, BQ]}.
+ {call, ?BQMOD, dropwhile, [fun dropfun/1, false, BQ]}.
qc_is_empty(#state{bqstate = BQ}) ->
{call, ?BQMOD, is_empty, [BQ]}.
@@ -267,10 +267,11 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
BQ1 = {call, erlang, element, [2, Res]},
S#state{bqstate = BQ1};
-next_state(S, BQ1, {call, ?BQMOD, dropwhile, _Args}) ->
+next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) ->
+ BQ = {call, erlang, element, [2, Res]},
#state{messages = Messages} = S,
Msgs1 = drop_messages(Messages),
- S#state{bqstate = BQ1, len = gb_trees:size(Msgs1), messages = Msgs1};
+ S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1};
next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) ->
S;
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 8ad5901629..734456d35f 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -63,7 +63,7 @@
-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()).
--spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers()))
+-spec(map_headers/2 :: (fun((headers()) -> headers()), rabbit_types:content())
-> rabbit_types:content()).
-spec(header_routes/1 ::
@@ -224,6 +224,5 @@ header_routes(HeadersTable) ->
{array, Routes} -> [Route || {longstr, Route} <- Routes];
undefined -> [];
{Type, _Val} -> throw({error, {unacceptable_type_in_header,
- Type,
- binary_to_list(HeaderKey)}})
+ binary_to_list(HeaderKey), Type}})
end || HeaderKey <- ?ROUTING_HEADERS]).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 846890a145..22c6a22361 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,9 +36,9 @@
conn_name, limiter, tx_status, next_tag, unacked_message_q,
uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
virtual_host, most_recently_declared_queue, queue_monitors,
- consumer_mapping, blocking, queue_consumers, queue_collector_pid,
- stats_timer, confirm_enabled, publish_seqno, unconfirmed,
- confirmed, capabilities, trace_state}).
+ consumer_mapping, blocking, queue_consumers, delivering_queues,
+ queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
+ unconfirmed, confirmed, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -198,6 +198,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
consumer_mapping = dict:new(),
blocking = sets:new(),
queue_consumers = dict:new(),
+ delivering_queues = sets:new(),
queue_collector_pid = CollectorPid,
confirm_enabled = false,
publish_seqno = 1,
@@ -331,10 +332,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
State2 = queue_blocked(QPid, State1),
State3 = handle_consuming_queue_down(QPid, State2),
+ State4 = handle_delivering_queue_down(QPid, State3),
credit_flow:peer_down(QPid),
erase_queue_stats(QPid),
noreply(State3#ch{queue_monitors = pmon:erase(
- QPid, State3#ch.queue_monitors)});
+ QPid, State4#ch.queue_monitors)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -657,7 +659,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, _QPid, _MsgId, Redelivered,
+ Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}} ->
@@ -669,7 +671,8 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, record_sent(none, not(NoAck), Msg, State)};
+ State1 = monitor_delivering_queue(NoAck, QPid, State),
+ {noreply, record_sent(none, not(NoAck), Msg, State1)};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -707,10 +710,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ActualConsumerTag})),
Q}
end) of
- {ok, Q} ->
- State1 = State#ch{consumer_mapping =
- dict:store(ActualConsumerTag, Q,
- ConsumerMapping)},
+ {ok, Q = #amqqueue{pid = QPid}} ->
+ CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
+ State1 = monitor_delivering_queue(
+ NoAck, QPid, State#ch{consumer_mapping = CM1}),
{noreply,
case NoWait of
true -> consumer_monitor(ActualConsumerTag, State1);
@@ -1108,6 +1111,13 @@ consumer_monitor(ConsumerTag,
State
end.
+monitor_delivering_queue(true, _QPid, State) ->
+ State;
+monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons,
+ delivering_queues = DQ}) ->
+ State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+ delivering_queues = sets:add_element(QPid, DQ)}.
+
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
case rabbit_misc:is_abnormal_termination(Reason) of
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
@@ -1134,6 +1144,9 @@ handle_consuming_queue_down(QPid,
State#ch{consumer_mapping = ConsumerMapping1,
queue_consumers = dict:erase(QPid, QCons)}.
+handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
+ State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
+
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
@@ -1269,9 +1282,11 @@ new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
notify_queues(State = #ch{state = closing}) ->
{ok, State};
-notify_queues(State = #ch{consumer_mapping = Consumers}) ->
- {rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()),
- State#ch{state = closing}}.
+notify_queues(State = #ch{consumer_mapping = Consumers,
+ delivering_queues = DQ }) ->
+ QPids = sets:to_list(
+ sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
+ {rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}.
fold_per_queue(_F, Acc, []) ->
Acc;
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 9f99f17a0b..2dea2a2f34 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -194,7 +194,11 @@ action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
- wait_for_application(Node, PidFile, Inform);
+ wait_for_application(Node, PidFile, rabbit, Inform);
+
+action(wait, Node, [PidFile, App], _Opts, Inform) ->
+ Inform("Waiting for ~p on ~p", [App, Node]),
+ wait_for_application(Node, PidFile, list_to_atom(App), Inform);
action(status, Node, [], _Opts, Inform) ->
Inform("Status of node ~p", [Node]),
@@ -212,33 +216,33 @@ action(rotate_logs, Node, [], _Opts, Inform) ->
Inform("Reopening logs for node ~p", [Node]),
call(Node, {rabbit, rotate_logs, [""]});
action(rotate_logs, Node, Args = [Suffix], _Opts, Inform) ->
- Inform("Rotating logs to files with suffix ~p", [Suffix]),
+ Inform("Rotating logs to files with suffix \"~s\"", [Suffix]),
call(Node, {rabbit, rotate_logs, Args});
action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) ->
- Inform("Closing connection ~s", [PidStr]),
+ Inform("Closing connection \"~s\"", [PidStr]),
rpc_call(Node, rabbit_networking, close_connection,
[rabbit_misc:string_to_pid(PidStr), Explanation]);
action(add_user, Node, Args = [Username, _Password], _Opts, Inform) ->
- Inform("Creating user ~p", [Username]),
+ Inform("Creating user \"~s\"", [Username]),
call(Node, {rabbit_auth_backend_internal, add_user, Args});
action(delete_user, Node, Args = [_Username], _Opts, Inform) ->
- Inform("Deleting user ~p", Args),
+ Inform("Deleting user \"~s\"", Args),
call(Node, {rabbit_auth_backend_internal, delete_user, Args});
action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) ->
- Inform("Changing password for user ~p", [Username]),
+ Inform("Changing password for user \"~s\"", [Username]),
call(Node, {rabbit_auth_backend_internal, change_password, Args});
action(clear_password, Node, Args = [Username], _Opts, Inform) ->
- Inform("Clearing password for user ~p", [Username]),
+ Inform("Clearing password for user \"~s\"", [Username]),
call(Node, {rabbit_auth_backend_internal, clear_password, Args});
action(set_user_tags, Node, [Username | TagsStr], _Opts, Inform) ->
Tags = [list_to_atom(T) || T <- TagsStr],
- Inform("Setting tags for user ~p to ~p", [Username, Tags]),
+ Inform("Setting tags for user \"~s\" to ~p", [Username, Tags]),
rpc_call(Node, rabbit_auth_backend_internal, set_tags,
[list_to_binary(Username), Tags]);
@@ -249,11 +253,11 @@ action(list_users, Node, [], _Opts, Inform) ->
rabbit_auth_backend_internal:user_info_keys());
action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
- Inform("Creating vhost ~p", Args),
+ Inform("Creating vhost \"~s\"", Args),
call(Node, {rabbit_vhost, add, Args});
action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
- Inform("Deleting vhost ~p", Args),
+ Inform("Deleting vhost \"~s\"", Args),
call(Node, {rabbit_vhost, delete, Args});
action(list_vhosts, Node, Args, _Opts, Inform) ->
@@ -315,12 +319,12 @@ action(list_consumers, Node, _Args, Opts, Inform) ->
action(trace_on, Node, [], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Starting tracing for vhost ~p", [VHost]),
+ Inform("Starting tracing for vhost \"~s\"", [VHost]),
rpc_call(Node, rabbit_trace, start, [list_to_binary(VHost)]);
action(trace_off, Node, [], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Stopping tracing for vhost ~p", [VHost]),
+ Inform("Stopping tracing for vhost \"~s\"", [VHost]),
rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]);
action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) ->
@@ -333,19 +337,21 @@ action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) ->
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
+ Inform("Setting permissions for user \"~s\" in vhost \"~s\"",
+ [Username, VHost]),
call(Node, {rabbit_auth_backend_internal, set_permissions,
[Username, VHost, CPerm, WPerm, RPerm]});
action(clear_permissions, Node, [Username], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]),
+ Inform("Clearing permissions for user \"~s\" in vhost \"~s\"",
+ [Username, VHost]),
call(Node, {rabbit_auth_backend_internal, clear_permissions,
[Username, VHost]});
action(list_permissions, Node, [], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Listing permissions in vhost ~p", [VHost]),
+ Inform("Listing permissions in vhost \"~s\"", [VHost]),
display_info_list(call(Node, {rabbit_auth_backend_internal,
list_vhost_permissions, [VHost]}),
rabbit_auth_backend_internal:vhost_perms_info_keys());
@@ -396,17 +402,17 @@ action(eval, Node, [Expr], _Opts, _Inform) ->
%%----------------------------------------------------------------------------
-wait_for_application(Node, PidFile, Inform) ->
+wait_for_application(Node, PidFile, Application, Inform) ->
Pid = read_pid_file(PidFile, true),
Inform("pid is ~s", [Pid]),
- wait_for_application(Node, Pid).
+ wait_for_application(Node, Pid, Application).
-wait_for_application(Node, Pid) ->
+wait_for_application(Node, Pid, Application) ->
case process_up(Pid) of
- true -> case rabbit:is_running(Node) of
+ true -> case rabbit_nodes:is_running(Node, Application) of
true -> ok;
false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL),
- wait_for_application(Node, Pid)
+ wait_for_application(Node, Pid, Application)
end;
false -> {error, process_not_running}
end.
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index 831d52901b..b1750b61ad 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -87,9 +87,9 @@ init([Limit]) ->
vm_memory_monitor:get_total_memory()} of
{N1, N2} when is_integer(N1), is_integer(N2) ->
{ok, set_disk_limits(State, Limit)};
- _ ->
+ Err ->
rabbit_log:info("Disabling disk free space monitoring "
- "on unsupported platform~n"),
+ "on unsupported platform: ~p~n", [Err]),
{stop, unsupported_platform}
end.
@@ -167,7 +167,7 @@ get_disk_free(Dir, {unix, Sun})
get_disk_free(Dir, {unix, _}) ->
parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir));
get_disk_free(Dir, {win32, _}) ->
- parse_free_win32(rabbit_misc:os_cmd("dir /-C /W " ++ Dir));
+ parse_free_win32(os:cmd("dir /-C /W \"" ++ Dir ++ [$"]));
get_disk_free(_, _) ->
unknown.
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 4ec141cfd8..3f1b20fed6 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -139,6 +139,6 @@ notify_if(false, _Type, _Props) -> ok.
notify(Type, Props) ->
%% TODO: switch to os:timestamp() when we drop support for
%% Erlang/OTP < R13B01
- gen_event:notify(rabbit_event, #event{type = Type,
- props = Props,
- timestamp = now()}).
+ gen_event:notify(?MODULE, #event{type = Type,
+ props = Props,
+ timestamp = now()}).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 9fa6213b94..2b15498ed9 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -192,7 +192,7 @@ terminate(_, _) ->
ok.
code_change(_, State, _) ->
- State.
+ {ok, State}.
%%----------------------------------------------------------------------------
%% Internal plumbing
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 2d155d14e3..17e2ffb472 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -356,7 +356,7 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }).
-handle_info(send_gm_heartbeat, State = #state{gm = GM}) ->
+handle_info(send_gm_heartbeat, State = #state { gm = GM }) ->
gm:broadcast(GM, heartbeat),
ensure_gm_heartbeat(),
noreply(State);
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 04b7514f71..4e71cc43db 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -134,7 +134,7 @@ delete_and_terminate(Reason, State = #state { gm = GM,
purge(State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {set_length, 0}),
+ ok = gm:broadcast(GM, {set_length, 0, false}),
{Count, BQS1} = BQ:purge(BQS),
{Count, State #state { backing_queue_state = BQS1,
set_delivered = 0 }}.
@@ -168,19 +168,19 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 })}.
-dropwhile(Pred, MsgFun,
+dropwhile(Pred, AckRequired,
State = #state{gm = GM,
backing_queue = BQ,
set_delivered = SetDelivered,
backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- BQS1 = BQ:dropwhile(Pred, MsgFun, BQS),
+ {Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
Len1 = BQ:len(BQS1),
- ok = gm:broadcast(GM, {set_length, Len1}),
+ ok = gm:broadcast(GM, {set_length, Len1, AckRequired}),
Dropped = Len - Len1,
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1 }.
+ {Msgs, State #state { backing_queue_state = BQS1,
+ set_delivered = SetDelivered1 } }.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -246,12 +246,9 @@ ack(AckTags, State = #state { gm = GM,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
-fold(MsgFun, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS}, AckTags) ->
- BQS1 = BQ:fold(MsgFun, BQS, AckTags),
- ok = gm:broadcast(GM, {fold, MsgFun, AckTags}),
- State #state { backing_queue_state = BQS1 }.
+fold(MsgFun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }, AckTags) ->
+ State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }.
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 4b095209da..e412fbbc5d 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -793,23 +793,27 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
{ok, State1 #state { sender_queues = SQ1,
msg_id_status = MS1,
backing_queue_state = BQS1 }};
-process_instruction({set_length, Length},
+process_instruction({set_length, Length, AckRequired},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
ToDrop = QLen - Length,
- {ok, case ToDrop >= 0 of
- true -> BQS1 =
- lists:foldl(
- fun (const, BQSN) ->
- {{_Msg, _IsDelivered, _AckTag, _Remaining},
- BQSN1} = BQ:fetch(false, BQSN),
- BQSN1
- end, BQS, lists:duplicate(ToDrop, const)),
- set_synchronised(
- true, State #state { backing_queue_state = BQS1 });
- false -> State
- end};
+ {ok,
+ case ToDrop >= 0 of
+ true ->
+ State1 =
+ lists:foldl(
+ fun (const, StateN = #state {backing_queue_state = BQSN}) ->
+ {{#basic_message{id = MsgId}, _IsDelivered, AckTag,
+ _Remaining}, BQSN1} = BQ:fetch(AckRequired, BQSN),
+ maybe_store_ack(
+ AckRequired, MsgId, AckTag,
+ StateN #state { backing_queue_state = BQSN1 })
+ end, State, lists:duplicate(ToDrop, const)),
+ set_synchronised(true, State1);
+ false ->
+ State
+ end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -835,11 +839,6 @@ process_instruction({ack, MsgIds},
[] = MsgIds1 -- MsgIds, %% ASSERTION
{ok, State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 }};
-process_instruction({fold, MsgFun, AckTags},
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- BQS1 = BQ:fold(MsgFun, BQS, AckTags),
- {ok, State #state { backing_queue_state = BQS1 }};
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index e6a0533547..1a12d43b74 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -18,8 +18,8 @@
-include("rabbit.hrl").
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
- recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1,
- maybe_fast_close/1, sockname/1, peername/1, peercert/1,
+ recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2,
+ close/1, maybe_fast_close/1, sockname/1, peername/1, peercert/1,
connection_string/2]).
%%---------------------------------------------------------------------------
@@ -34,6 +34,8 @@
-type(ok_val_or_error(A) :: rabbit_types:ok_or_error2(A, any())).
-type(ok_or_any_error() :: rabbit_types:ok_or_error(any())).
-type(socket() :: port() | #ssl_socket{}).
+-type(opts() :: [{atom(), any()} |
+ {raw, non_neg_integer(), non_neg_integer(), binary()}]).
-spec(is_ssl/1 :: (socket()) -> boolean()).
-spec(ssl_info/1 :: (socket())
@@ -49,9 +51,12 @@
-spec(async_recv/3 ::
(socket(), integer(), timeout()) -> rabbit_types:ok(any())).
-spec(port_command/2 :: (socket(), iolist()) -> 'true').
--spec(setopts/2 :: (socket(), [{atom(), any()} |
- {raw, non_neg_integer(), non_neg_integer(),
- binary()}]) -> ok_or_any_error()).
+-spec(getopts/2 :: (socket(), [atom() | {raw,
+ non_neg_integer(),
+ non_neg_integer(),
+ non_neg_integer() | binary()}])
+ -> ok_val_or_error(opts())).
+-spec(setopts/2 :: (socket(), opts()) -> ok_or_any_error()).
-spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()).
-spec(close/1 :: (socket()) -> ok_or_any_error()).
-spec(maybe_fast_close/1 :: (socket()) -> ok_or_any_error()).
@@ -126,6 +131,11 @@ port_command(Sock, Data) when ?IS_SSL(Sock) ->
port_command(Sock, Data) when is_port(Sock) ->
erlang:port_command(Sock, Data).
+getopts(Sock, Options) when ?IS_SSL(Sock) ->
+ ssl:getopts(Sock#ssl_socket.ssl, Options);
+getopts(Sock, Options) when is_port(Sock) ->
+ inet:getopts(Sock, Options).
+
setopts(Sock, Options) when ?IS_SSL(Sock) ->
ssl:setopts(Sock#ssl_socket.ssl, Options);
setopts(Sock, Options) when is_port(Sock) ->
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 28ba469863..1c23632d52 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -16,7 +16,7 @@
-module(rabbit_nodes).
--export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0]).
+-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, is_running/2]).
-define(EPMD_TIMEOUT, 30000).
@@ -32,6 +32,7 @@
-spec(make/1 :: ({string(), string()} | string()) -> node()).
-spec(parts/1 :: (node() | string()) -> {string(), string()}).
-spec(cookie_hash/0 :: () -> string()).
+-spec(is_running/2 :: (node(), atom()) -> boolean()).
-endif.
@@ -91,3 +92,9 @@ parts(NodeStr) ->
cookie_hash() ->
base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))).
+
+is_running(Node, Application) ->
+ case rpc:call(Node, application, which_applications, [infinity]) of
+ {badrpc, _} -> false;
+ Apps -> proplists:is_defined(Application, Apps)
+ end.
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 2a93c8f2f7..00880fb2e2 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -245,7 +245,7 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
{true, true} -> throw({error_string,
"Cannot specify -m and -v together"})
end,
- OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts),
+ OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts),
OnlyEnabledAll = proplists:get_bool(?ENABLED_ALL_OPT, Opts),
AvailablePlugins = find_plugins(PluginsDir),
@@ -257,14 +257,10 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
Plugins = [ Plugin ||
Plugin = #plugin{name = Name} <- AvailablePlugins,
re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match,
- if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
- true -> true
- end,
- if OnlyEnabledAll ->
- lists:member(Name, EnabledImplicitly) or
- lists:member(Name, EnabledExplicitly);
- true ->
- true
+ if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
+ OnlyEnabledAll -> (lists:member(Name, EnabledExplicitly) or
+ lists:member(Name, EnabledImplicitly));
+ true -> true
end],
Plugins1 = usort_plugins(Plugins),
MaxWidth = lists:max([length(atom_to_list(Name)) ||
@@ -338,8 +334,8 @@ read_enabled_plugins(PluginsFile) ->
case rabbit_file:read_term_file(PluginsFile) of
{ok, [Plugins]} -> Plugins;
{ok, []} -> [];
- {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file,
- PluginsFile}});
+ {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file,
+ PluginsFile}});
{error, enoent} -> [];
{error, Reason} -> throw({error, {cannot_read_enabled_plugins_file,
PluginsFile, Reason}})
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 5e9e78d380..5acf6acaf6 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -222,6 +222,14 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
last_blocked_by = none,
last_blocked_at = never},
try
+ BufSizes = inet_op(fun () ->
+ rabbit_net:getopts(
+ ClientSock, [sndbuf, recbuf, buffer])
+ end),
+ BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
+ ok = inet_op(fun () ->
+ rabbit_net:setopts(ClientSock, [{buffer, BufSz}])
+ end),
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
handshake, 8)),
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index e1c417c410..172cee92db 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -88,7 +88,7 @@ set0(Component, Key, Term) ->
mnesia_update(Component, Key, Term) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- Res = case mnesia:read(?TABLE, {Component, Key}) of
+ Res = case mnesia:read(?TABLE, {Component, Key}, read) of
[] -> new;
[Params] -> {old, Params#runtime_parameters.value}
end,
@@ -158,7 +158,7 @@ lookup0(Component, Key, DefaultFun) ->
lookup_missing(Component, Key, Default) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:read(?TABLE, {Component, Key}) of
+ case mnesia:read(?TABLE, {Component, Key}, read) of
[] -> Record = c(Component, Key, Default),
mnesia:write(?TABLE, Record, write),
Record;
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a502fa95c3..67abe06efe 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2421,10 +2421,10 @@ test_dropwhile(VQ0) ->
fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
%% drop the first 5 messages
- VQ2 = rabbit_variable_queue:dropwhile(
- fun(#message_properties { expiry = Expiry }) ->
- Expiry =< 5
- end, undefined, VQ1),
+ {undefined, VQ2} = rabbit_variable_queue:dropwhile(
+ fun(#message_properties { expiry = Expiry }) ->
+ Expiry =< 5
+ end, false, VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
@@ -2441,11 +2441,13 @@ test_dropwhile(VQ0) ->
test_dropwhile_varying_ram_duration(VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- VQ3 = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, undefined, VQ2),
+ {undefined, VQ3} = rabbit_variable_queue:dropwhile(
+ fun(_) -> false end, false, VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- rabbit_variable_queue:dropwhile(fun(_) -> false end, undefined, VQ5).
+ {undefined, VQ6} =
+ rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5),
+ VQ6.
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f9315c5dff..dafb3f2ee7 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,13 +16,12 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
+-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
+ publish/4, publish_delivered/5, drain_confirmed/1,
dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
- set_ram_duration_target/2, ram_duration/1,
- needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, discard/3,
- multiple_routing_keys/0, fold/3]).
+ set_ram_duration_target/2, ram_duration/1, needs_timeout/1,
+ timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
+ is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]).
-export([start/1, stop/0]).
@@ -324,7 +323,6 @@
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
-type(seq_id() :: non_neg_integer()).
--type(ack() :: seq_id()).
-type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()},
ingress :: {timestamp(), non_neg_integer()},
@@ -336,6 +334,13 @@
count :: non_neg_integer(),
end_seq_id :: non_neg_integer() }).
+%% The compiler (rightfully) complains that ack() and state() are
+%% unused. For this reason we duplicate a -spec from
+%% rabbit_backing_queue with the only intent being to remove
+%% warnings. The problem here is that we can't parameterise the BQ
+%% behaviour by these two types as we would like to. We still leave
+%% these here for documentation purposes.
+-type(ack() :: seq_id()).
-type(state() :: #vqstate {
q1 :: ?QUEUE:?QUEUE(),
q2 :: ?QUEUE:?QUEUE(),
@@ -369,6 +374,8 @@
ack_out_counter :: non_neg_integer(),
ack_in_counter :: non_neg_integer(),
ack_rates :: rates() }).
+%% Duplicated from rabbit_backing_queue
+-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
-spec(multiple_routing_keys/0 :: () -> 'ok').
@@ -579,23 +586,27 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, MsgFun, State) ->
+dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
+
+dropwhile(Pred, AckRequired, State, Msgs) ->
+ End = fun(S) when AckRequired -> {lists:reverse(Msgs), S};
+ (S) -> {undefined, S}
+ end,
case queue_out(State) of
{empty, State1} ->
- a(State1);
+ End(a(State1));
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case {Pred(MsgProps), MsgFun} of
- {true, undefined} ->
- {_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile(Pred, MsgFun, State2);
- {true, _} ->
+ case {Pred(MsgProps), AckRequired} of
+ {true, true} ->
{MsgStatus1, State2} = read_msg(MsgStatus, State1),
{{Msg, _, AckTag, _}, State3} =
internal_fetch(true, MsgStatus1, State2),
- MsgFun(Msg, AckTag),
- dropwhile(Pred, MsgFun, State3);
+ dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]);
+ {true, false} ->
+ {_, State2} = internal_fetch(false, MsgStatus, State1),
+ dropwhile(Pred, AckRequired, State2, undefined);
{false, _} ->
- a(in_r(MsgStatus, State1))
+ End(a(in_r(MsgStatus, State1)))
end
end.
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index f1b748787a..3d3623d752 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -411,6 +411,8 @@ handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) ->
#child{mfa = {M, F, A}} = hd(State#state.children),
Args = A ++ EArgs,
case do_start_child_i(M, F, Args) of
+ {ok, undefined} ->
+ {reply, {ok, undefined}, State};
{ok, Pid} ->
NState = State#state{dynamics =
?DICT:store(Pid, Args, State#state.dynamics)},
@@ -743,6 +745,8 @@ restart(Strategy, Child, State, Restart)
#child{mfa = {M, F, A}} = Child,
Dynamics = ?DICT:erase(Child#child.pid, State#state.dynamics),
case do_start_child_i(M, F, A) of
+ {ok, undefined} ->
+ {ok, State};
{ok, Pid} ->
NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)},
{ok, NState};