diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-05-15 16:24:27 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-05-15 16:24:27 +0100 |
| commit | 01c649e6ef942271d12f6a7620ca2fdbb8c8464f (patch) | |
| tree | ec3b37880f79ecd1e517ab7e6b5e84b40c9c4cb8 | |
| parent | 48ed9f2a5bbc39ef752c7d698416b1d368c7322e (diff) | |
| parent | d0652f278fc17ed260738b51d6f49a0598d0dd2f (diff) | |
| download | rabbitmq-server-git-01c649e6ef942271d12f6a7620ca2fdbb8c8464f.tar.gz | |
Merge default
30 files changed, 476 insertions, 228 deletions
@@ -207,7 +207,7 @@ start-background-node: all -rm -f $(RABBITMQ_MNESIA_DIR).pid mkdir -p $(RABBITMQ_MNESIA_DIR) setsid sh -c "$(MAKE) run-background-node > $(RABBITMQ_MNESIA_DIR)/startup_log 2> $(RABBITMQ_MNESIA_DIR)/startup_err" & - sleep 1 + ./scripts/rabbitmqctl -n $(RABBITMQ_NODENAME) wait $(RABBITMQ_MNESIA_DIR).pid kernel start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index 234fc2c7d7..180500ed38 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -32,8 +32,8 @@ prepare: SPECS/rabbitmq-server.spec cp ${COMMON_DIR}/* SOURCES/ + cp rabbitmq-server.init SOURCES/rabbitmq-server.init sed -i \ - -e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \ -e 's|^START_PROG=.*$$|START_PROG="$(START_PROG)"|' \ SOURCES/rabbitmq-server.init ifeq "$(RPM_OS)" "fedora" diff --git a/packaging/common/rabbitmq-server.init b/packaging/RPMS/Fedora/rabbitmq-server.init index c942f8e314..2d2680e3b2 100644 --- a/packaging/common/rabbitmq-server.init +++ b/packaging/RPMS/Fedora/rabbitmq-server.init @@ -27,7 +27,7 @@ INIT_LOG_DIR=/var/log/rabbitmq PID_FILE=/var/run/rabbitmq/pid START_PROG= # Set when building package -LOCK_FILE= # Set when building package +LOCK_FILE=/var/lock/subsys/$NAME test -x $DAEMON || exit 0 test -x $CONTROL || exit 0 @@ -35,6 +35,8 @@ test -x $CONTROL || exit 0 RETVAL=0 set -e +[ -f /etc/default/${NAME} ] && . /etc/default/${NAME} + ensure_pid_dir () { PID_DIR=`dirname ${PID_FILE}` if [ ! -d ${PID_DIR} ] ; then diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index 2a738f6e7a..844388c6f4 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -22,14 +22,6 @@ package: clean tar -zxf $(DEBIAN_ORIG_TARBALL) cp -r debian $(UNPACKED_DIR) cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/ -# Debian and descendants differ from most other distros in that -# runlevel 2 should start network services. - sed -i \ - -e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \ - -e 's|^START_PROG=.*$$|START_PROG="start-stop-daemon -v --chuid rabbitmq --start --exec"|' \ - -e 's|^\(# Default-Start:\).*$$|\1 2 3 4 5|' \ - -e 's|^\(# Default-Stop:\).*$$|\1 0 1 6|' \ - $(UNPACKED_DIR)/debian/rabbitmq-server.init sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \ $(UNPACKED_DIR)/debian/rabbitmq-script-wrapper chmod a+x $(UNPACKED_DIR)/debian/rules diff --git a/packaging/debs/Debian/debian/rabbitmq-server.default b/packaging/debs/Debian/debian/rabbitmq-server.default new file mode 100644 index 0000000000..bde5e30895 --- /dev/null +++ b/packaging/debs/Debian/debian/rabbitmq-server.default @@ -0,0 +1,9 @@ +# This file is sourced by /etc/init.d/rabbitmq-server. Its primary +# reason for existing is to allow adjustment of system limits for the +# rabbitmq-server process. +# +# Maximum number of open file handles. This will need to be increased +# to handle many simultaneous connections. Refer to the system +# documentation for ulimit (in man bash) for more information. +# +#ulimit -n 1024 diff --git a/packaging/debs/Debian/debian/rabbitmq-server.init b/packaging/debs/Debian/debian/rabbitmq-server.init new file mode 100644 index 0000000000..f514b9744b --- /dev/null +++ b/packaging/debs/Debian/debian/rabbitmq-server.init @@ -0,0 +1,185 @@ +#!/bin/sh +# +# rabbitmq-server RabbitMQ broker +# +# chkconfig: - 80 05 +# description: Enable AMQP service provided by RabbitMQ +# + +### BEGIN INIT INFO +# Provides: rabbitmq-server +# Required-Start: $remote_fs $network +# Required-Stop: $remote_fs $network +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Description: RabbitMQ broker +# Short-Description: Enable AMQP service provided by RabbitMQ broker +### END INIT INFO + +PATH=/sbin:/usr/sbin:/bin:/usr/bin +NAME=rabbitmq-server +DAEMON=/usr/sbin/${NAME} +CONTROL=/usr/sbin/rabbitmqctl +DESC="message broker" +USER=rabbitmq +ROTATE_SUFFIX= +INIT_LOG_DIR=/var/log/rabbitmq +PID_FILE=/var/run/rabbitmq/pid + + +test -x $DAEMON || exit 0 +test -x $CONTROL || exit 0 + +RETVAL=0 +set -e + +[ -f /etc/default/${NAME} ] && . /etc/default/${NAME} + +. /lib/lsb/init-functions +. /lib/init/vars.sh + +ensure_pid_dir () { + PID_DIR=`dirname ${PID_FILE}` + if [ ! -d ${PID_DIR} ] ; then + mkdir -p ${PID_DIR} + chown -R ${USER}:${USER} ${PID_DIR} + chmod 755 ${PID_DIR} + fi +} + +remove_pid () { + rm -f ${PID_FILE} + rmdir `dirname ${PID_FILE}` || : +} + +start_rabbitmq () { + status_rabbitmq quiet + if [ $RETVAL != 0 ] ; then + RETVAL=0 + ensure_pid_dir + set +e + RABBITMQ_PID_FILE=$PID_FILE start-stop-daemon --quiet \ + --chuid rabbitmq --start --exec $DAEMON \ + --pidfile "$RABBITMQ_PID_FILE" \ + > "${INIT_LOG_DIR}/startup_log" \ + 2> "${INIT_LOG_DIR}/startup_err" \ + 0<&- & + $CONTROL wait $PID_FILE >/dev/null 2>&1 + RETVAL=$? + set -e + if [ $RETVAL != 0 ] ; then + remove_pid + fi + else + RETVAL=3 + fi +} + +stop_rabbitmq () { + status_rabbitmq quiet + if [ $RETVAL = 0 ] ; then + set +e + $CONTROL stop ${PID_FILE} > ${INIT_LOG_DIR}/shutdown_log 2> ${INIT_LOG_DIR}/shutdown_err + RETVAL=$? + set -e + if [ $RETVAL = 0 ] ; then + remove_pid + fi + else + RETVAL=3 + fi +} + +status_rabbitmq() { + set +e + if [ "$1" != "quiet" ] ; then + $CONTROL status 2>&1 + else + $CONTROL status > /dev/null 2>&1 + fi + if [ $? != 0 ] ; then + RETVAL=3 + fi + set -e +} + +rotate_logs_rabbitmq() { + set +e + $CONTROL -q rotate_logs ${ROTATE_SUFFIX} + if [ $? != 0 ] ; then + RETVAL=1 + fi + set -e +} + +restart_running_rabbitmq () { + status_rabbitmq quiet + if [ $RETVAL = 0 ] ; then + restart_rabbitmq + else + log_warning_msg "${DESC} not running" + fi +} + +restart_rabbitmq() { + stop_rabbitmq + start_rabbitmq +} + +restart_end() { + if [ $RETVAL = 0 ] ; then + log_end_msg 0 + else + log_end_msg 1 + fi +} + +start_stop_end() { + case "$RETVAL" in + 0) + log_end_msg 0;; + 3) + log_warning_msg "${DESC} already ${1}" + log_end_msg 0;; + *) + log_warning_msg "FAILED - check ${INIT_LOG_DIR}/startup_\{log, _err\}" + log_end_msg 1;; + esac +} + +case "$1" in + start) + log_daemon_msg "Starting ${DESC}" $NAME + start_rabbitmq + start_stop_end "started" + ;; + stop) + log_daemon_msg "Stopping ${DESC}" $NAME + stop_rabbitmq + start_stop_end "stopped" + ;; + status) + status_rabbitmq + ;; + rotate-logs) + log_action_begin_msg "Rotating log files for ${DESC} ${NAME}" + rotate_logs_rabbitmq + log_action_end_msg $RETVAL + ;; + force-reload|reload|restart) + log_daemon_msg "Restarting ${DESC}" $NAME + restart_rabbitmq + restart_end + ;; + try-restart) + log_daemon_msg "Restarting ${DESC}" $NAME + restart_running_rabbitmq + restart_end + ;; + *) + echo "Usage: $0 {start|stop|status|rotate-logs|restart|condrestart|try-restart|reload|force-reload}" >&2 + RETVAL=1 + ;; +esac + +exit $RETVAL diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index 108b1ed512..ecb778df96 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -19,3 +19,4 @@ install/rabbitmq-server:: done sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server + install -p -D -m 0644 debian/rabbitmq-server.default $(DEB_DESTDIR)etc/default/rabbitmq-server diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 221f6a8727..4fe9398108 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -261,24 +261,19 @@ start_internal(Group, ChildSpecs) -> %%---------------------------------------------------------------------------- -init({overall, Group, Init}) -> - case Init of - {ok, {Restart, ChildSpecs}} -> - Delegate = {delegate, {?SUPERVISOR, start_link, - [?MODULE, {delegate, Restart}]}, - temporary, 16#ffffffff, supervisor, [?SUPERVISOR]}, - Mirroring = {mirroring, {?MODULE, start_internal, - [Group, ChildSpecs]}, - permanent, 16#ffffffff, worker, [?MODULE]}, - %% Important: Delegate MUST start before Mirroring so that - %% when we shut down from above it shuts down last, so - %% Mirroring does not see it die. - %% - %% See comment in handle_info('DOWN', ...) below - {ok, {{one_for_all, 0, 1}, [Delegate, Mirroring]}}; - ignore -> - ignore - end; +init({overall, _Group, ignore}) -> ignore; +init({overall, Group, {ok, {Restart, ChildSpecs}}}) -> + %% Important: Delegate MUST start before Mirroring so that when we + %% shut down from above it shuts down last, so Mirroring does not + %% see it die. + %% + %% See comment in handle_info('DOWN', ...) below + {ok, {{one_for_all, 0, 1}, + [{delegate, {?SUPERVISOR, start_link, [?MODULE, {delegate, Restart}]}, + temporary, 16#ffffffff, supervisor, [?SUPERVISOR]}, + {mirroring, {?MODULE, start_internal, [Group, ChildSpecs]}, + permanent, 16#ffffffff, worker, [?MODULE]}]}}; + init({delegate, Restart}) -> {ok, {Restart, []}}; @@ -306,9 +301,9 @@ handle_call({init, Overall}, _From, Delegate = child(Overall, delegate), erlang:monitor(process, Delegate), State1 = State#state{overall = Overall, delegate = Delegate}, - case all_started([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of - true -> {reply, ok, State1}; - false -> {stop, shutdown, State1} + case errors([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of + [] -> {reply, ok, State1}; + Errors -> {stop, {shutdown, Errors}, State1} end; handle_call({start_child, ChildSpec}, _From, @@ -372,9 +367,9 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; _ -> [] end, - case all_started(R) of - true -> {noreply, State}; - false -> {stop, shutdown, State} + case errors(R) of + [] -> {noreply, State}; + Errors -> {stop, {shutdown, Errors}, State} end; handle_info(Info, State) -> @@ -468,7 +463,7 @@ delete_all(Group) -> [delete(Group, id(C)) || C <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])]. -all_started(Results) -> [] =:= [R || R = {error, _} <- Results]. +errors(Results) -> [E || {error, E} <- Results]. %%---------------------------------------------------------------------------- diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl index e8baabe8ff..60192b34d7 100644 --- a/src/mirrored_supervisor_tests.erl +++ b/src/mirrored_supervisor_tests.erl @@ -51,7 +51,7 @@ test_migrate() -> with_sups(fun([A, _]) -> ?MS:start_child(a, childspec(worker)), Pid1 = pid_of(worker), - kill(A, Pid1), + kill_registered(A, Pid1), Pid2 = pid_of(worker), false = (Pid1 =:= Pid2) end, [a, b]). @@ -61,10 +61,10 @@ test_migrate_twice() -> with_sups(fun([A, B]) -> ?MS:start_child(a, childspec(worker)), Pid1 = pid_of(worker), - kill(A, Pid1), + kill_registered(A, Pid1), {ok, C} = start_sup(c), Pid2 = pid_of(worker), - kill(B, Pid2), + kill_registered(B, Pid2), Pid3 = pid_of(worker), false = (Pid1 =:= Pid3), kill(C) @@ -124,7 +124,7 @@ test_large_group() -> with_sups(fun([A, _, _, _]) -> ?MS:start_child(a, childspec(worker)), Pid1 = pid_of(worker), - kill(A, Pid1), + kill_registered(A, Pid1), Pid2 = pid_of(worker), false = (Pid1 =:= Pid2) end, [a, b, c, d]). @@ -134,7 +134,7 @@ test_childspecs_at_init() -> S = childspec(worker), with_sups(fun([A, _]) -> Pid1 = pid_of(worker), - kill(A, Pid1), + kill_registered(A, Pid1), Pid2 = pid_of(worker), false = (Pid1 =:= Pid2) end, [{a, [S]}, {b, [S]}]). @@ -143,7 +143,7 @@ test_anonymous_supervisors() -> with_sups(fun([A, _B]) -> ?MS:start_child(A, childspec(worker)), Pid1 = pid_of(worker), - kill(A, Pid1), + kill_registered(A, Pid1), Pid2 = pid_of(worker), false = (Pid1 =:= Pid2) end, [anon, anon]). @@ -289,6 +289,12 @@ kill(Pid, Waits) -> kill_wait(Pid), [kill_wait(P) || P <- Waits]. +kill_registered(Pid, Child) -> + {registered_name, Name} = erlang:process_info(Child, registered_name), + kill(Pid, Child), + false = (Child =:= whereis(Name)), + ok. + kill_wait(Pid) -> receive {'DOWN', _Ref, process, Pid, _Reason} -> diff --git a/src/rabbit.erl b/src/rabbit.erl index eec7e34e8e..ea9731b68b 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -197,7 +197,7 @@ rabbit_queue_index, gen, dict, ordsets, file_handle_cache, rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file, rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, - mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow]). + mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon]). %% HiPE compilation uses multiple cores anyway, but some bits are %% IO-bound so we can go faster if we parallelise a bit more. In @@ -347,10 +347,7 @@ status() -> is_running() -> is_running(node()). is_running(Node) -> - case rpc:call(Node, application, which_applications, [infinity]) of - {badrpc, _} -> false; - Apps -> proplists:is_defined(rabbit, Apps) - end. + rabbit_nodes:is_running(Node, rabbit). environment() -> lists:keysort( @@ -624,15 +621,12 @@ log_location(Type) -> rotate_logs(File, Suffix, Handler) -> rotate_logs(File, Suffix, Handler, Handler). -rotate_logs(File, Suffix, OldHandler, NewHandler) -> - case File of - undefined -> ok; - tty -> ok; - _ -> gen_event:swap_handler( - error_logger, - {OldHandler, swap}, - {NewHandler, {File, Suffix}}) - end. +rotate_logs(undefined, _Suffix, _OldHandler, _NewHandler) -> ok; +rotate_logs(tty, _Suffix, _OldHandler, _NewHandler) -> ok; +rotate_logs(File, Suffix, OldHandler, NewHandler) -> + gen_event:swap_handler(error_logger, + {OldHandler, swap}, + {NewHandler, {File, Suffix}}). log_rotation_result({error, MainLogError}, {error, SaslLogError}) -> {error, {{cannot_rotate_main_logs, MainLogError}, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2063e557f4..5701efeb99 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -696,12 +696,18 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> State; drop_expired_messages(State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> + backing_queue = BQ }) -> Now = now_micros(), - BQS1 = BQ:dropwhile( - fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, - dead_letter_fun(expired, State), - BQS), + DLXFun = dead_letter_fun(expired, State), + ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, + case DLXFun of + undefined -> {undefined, BQS1} = BQ:dropwhile(ExpirePred, false, BQS), + BQS1; + _ -> {Msgs, BQS1} = BQ:dropwhile(ExpirePred, true, BQS), + lists:foreach( + fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs), + BQS1 + end, ensure_ttl_timer(State#q{backing_queue_state = BQS1}). ensure_ttl_timer(State = #q{backing_queue = BQ, @@ -717,6 +723,14 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. +ack_if_no_dlx(AckTags, State = #q{dlx = undefined, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State#q{backing_queue_state = BQS1}; +ack_if_no_dlx(_AckTags, State) -> + State. + dead_letter_fun(_Reason, #q{dlx = undefined}) -> undefined; dead_letter_fun(Reason, _State) -> @@ -724,31 +738,24 @@ dead_letter_fun(Reason, _State) -> gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}) end. -dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> - case rabbit_exchange:lookup(DLX) of - {error, not_found} -> noreply(State); - _ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State) +dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) -> + DLMsg = #basic_message{exchange_name = XName} = + make_dead_letter_msg(Reason, Msg, State), + case rabbit_exchange:lookup(XName) of + {ok, X} -> + Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo), + {Queues, Cycles} = detect_dead_letter_cycles( + DLMsg, rabbit_exchange:route(X, Delivery)), + lists:foreach(fun log_cycle_once/1, Cycles), + QPids = rabbit_amqqueue:lookup(Queues), + {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), + DeliveredQPids; + {error, not_found} -> + [] end. -dead_letter_publish(Msg, Reason, - State = #q{publish_seqno = MsgSeqNo, - dlx = DLX}) -> - Delivery = #delivery{message = #basic_message{exchange_name = XName}} = - rabbit_basic:delivery( - false, false, make_dead_letter_msg(DLX, Reason, Msg, State), - MsgSeqNo), - {ok, X} = rabbit_exchange:lookup(XName), - Queues = rabbit_exchange:route(X, Delivery), - {Queues1, Cycles} = detect_dead_letter_cycles(Delivery, Queues), - lists:foreach(fun log_cycle_once/1, Cycles), - QPids = rabbit_amqqueue:lookup(Queues1), - {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), - DeliveredQPids. - -dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State = #q{publish_seqno = MsgSeqNo, - unconfirmed = UC}) -> +dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo, + unconfirmed = UC}) -> QPids = dead_letter_publish(Msg, Reason, State), State1 = State#q{queue_monitors = pmon:monitor_all( QPids, State#q.queue_monitors), @@ -807,8 +814,7 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, false -> noreply(State1) end. -detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}}, - Queues) -> +detect_dead_letter_cycles(#basic_message{content = Content}, Queues) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), NoCycles = {Queues, []}, @@ -835,31 +841,31 @@ detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}} end end. -make_dead_letter_msg(DLX, Reason, +make_dead_letter_msg(Reason, Msg = #basic_message{content = Content, exchange_name = Exchange, routing_keys = RoutingKeys}, - State = #q{dlx_routing_key = DlxRoutingKey}) -> + State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) -> {DeathRoutingKeys, HeadersFun1} = case DlxRoutingKey of undefined -> {RoutingKeys, fun (H) -> H end}; _ -> {[DlxRoutingKey], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} end, + ReasonBin = list_to_binary(atom_to_list(Reason)), #resource{name = QName} = qname(State), + TimeSec = rabbit_misc:now_ms() div 1000, HeadersFun2 = fun (Headers) -> %% The first routing key is the one specified in the %% basic.publish; all others are CC or BCC keys. - RoutingKeys1 = - [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], - Info = [{<<"reason">>, - longstr, list_to_binary(atom_to_list(Reason))}, - {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000}, - {<<"exchange">>, longstr, Exchange#resource.name}, - {<<"routing-keys">>, array, - [{longstr, Key} || Key <- RoutingKeys1]}], + RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], + RKs1 = [{longstr, Key} || Key <- RKs], + Info = [{<<"reason">>, longstr, ReasonBin}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, TimeSec}, + {<<"exchange">>, longstr, Exchange#resource.name}, + {<<"routing-keys">>, array, RKs1}], HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>, Info, Headers)) end, @@ -1234,11 +1240,13 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> ChPid, AckTags, State, case Requeue of true -> fun (State1) -> requeue_and_run(AckTags, State1) end; - false -> Fun = dead_letter_fun(rejected, State), - fun (State1 = #q{backing_queue = BQ, + false -> fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + Fun = dead_letter_fun(rejected, State1), BQS1 = BQ:fold(Fun, BQS, AckTags), - State1#q{backing_queue_state = BQS1} + ack_if_no_dlx( + AckTags, + State1#q{backing_queue_state = BQS1}) end end)); diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 6cc1c3fd3e..dc144a0e53 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -26,15 +26,14 @@ ('empty' | %% Message, IsDelivered, AckTag, Remaining_Len {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})). --type(is_durable() :: boolean()). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). --type(confirm_required() :: boolean()). -type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). -type(duration() :: ('undefined' | 'infinity' | number())). -type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') | 'undefined'). +-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())). %% Called on startup with a list of durable queue names. The queues %% aren't being started at this point, but this call allows the @@ -117,12 +116,14 @@ %% be ignored. -callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}. -%% Drop messages from the head of the queue while the supplied -%% predicate returns true. A callback function is supplied allowing -%% callers access to messages that are about to be dropped. --callback dropwhile(fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(), - state()) - -> state(). +%% Drop messages from the head of the queue while the supplied predicate returns +%% true. Also accepts a boolean parameter that determines whether the messages +%% necessitate an ack or not. If they do, the function returns a list of +%% messages with the respective acktags. +-callback dropwhile(msg_pred(), true, state()) + -> {[{rabbit_types:basic_message(), ack()}], state()}; + (msg_pred(), false, state()) + -> {undefined, state()}. %% Produce the next message. -callback fetch(true, state()) -> {fetch_result(ack()), state()}; diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 286b69e4ac..a84800c0ec 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -141,7 +141,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) -> {call, ?BQMOD, drain_confirmed, [BQ]}. qc_dropwhile(#state{bqstate = BQ}) -> - {call, ?BQMOD, dropwhile, [fun dropfun/1, fun (_,_) -> ok end, BQ]}. + {call, ?BQMOD, dropwhile, [fun dropfun/1, false, BQ]}. qc_is_empty(#state{bqstate = BQ}) -> {call, ?BQMOD, is_empty, [BQ]}. @@ -267,10 +267,11 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) -> BQ1 = {call, erlang, element, [2, Res]}, S#state{bqstate = BQ1}; -next_state(S, BQ1, {call, ?BQMOD, dropwhile, _Args}) -> +next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) -> + BQ = {call, erlang, element, [2, Res]}, #state{messages = Messages} = S, Msgs1 = drop_messages(Messages), - S#state{bqstate = BQ1, len = gb_trees:size(Msgs1), messages = Msgs1}; + S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1}; next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) -> S; diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 8ad5901629..734456d35f 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -63,7 +63,7 @@ -spec(extract_headers/1 :: (rabbit_types:content()) -> headers()). --spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers())) +-spec(map_headers/2 :: (fun((headers()) -> headers()), rabbit_types:content()) -> rabbit_types:content()). -spec(header_routes/1 :: @@ -224,6 +224,5 @@ header_routes(HeadersTable) -> {array, Routes} -> [Route || {longstr, Route} <- Routes]; undefined -> []; {Type, _Val} -> throw({error, {unacceptable_type_in_header, - Type, - binary_to_list(HeaderKey)}}) + binary_to_list(HeaderKey), Type}}) end || HeaderKey <- ?ROUTING_HEADERS]). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 846890a145..22c6a22361 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,9 +36,9 @@ conn_name, limiter, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, virtual_host, most_recently_declared_queue, queue_monitors, - consumer_mapping, blocking, queue_consumers, queue_collector_pid, - stats_timer, confirm_enabled, publish_seqno, unconfirmed, - confirmed, capabilities, trace_state}). + consumer_mapping, blocking, queue_consumers, delivering_queues, + queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, + unconfirmed, confirmed, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -198,6 +198,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, consumer_mapping = dict:new(), blocking = sets:new(), queue_consumers = dict:new(), + delivering_queues = sets:new(), queue_collector_pid = CollectorPid, confirm_enabled = false, publish_seqno = 1, @@ -331,10 +332,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), State2 = queue_blocked(QPid, State1), State3 = handle_consuming_queue_down(QPid, State2), + State4 = handle_delivering_queue_down(QPid, State3), credit_flow:peer_down(QPid), erase_queue_stats(QPid), noreply(State3#ch{queue_monitors = pmon:erase( - QPid, State3#ch.queue_monitors)}); + QPid, State4#ch.queue_monitors)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -657,7 +659,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, _QPid, _MsgId, Redelivered, + Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}} -> @@ -669,7 +671,8 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - {noreply, record_sent(none, not(NoAck), Msg, State)}; + State1 = monitor_delivering_queue(NoAck, QPid, State), + {noreply, record_sent(none, not(NoAck), Msg, State1)}; empty -> {reply, #'basic.get_empty'{}, State} end; @@ -707,10 +710,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ActualConsumerTag})), Q} end) of - {ok, Q} -> - State1 = State#ch{consumer_mapping = - dict:store(ActualConsumerTag, Q, - ConsumerMapping)}, + {ok, Q = #amqqueue{pid = QPid}} -> + CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping), + State1 = monitor_delivering_queue( + NoAck, QPid, State#ch{consumer_mapping = CM1}), {noreply, case NoWait of true -> consumer_monitor(ActualConsumerTag, State1); @@ -1108,6 +1111,13 @@ consumer_monitor(ConsumerTag, State end. +monitor_delivering_queue(true, _QPid, State) -> + State; +monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons, + delivering_queues = DQ}) -> + State#ch{queue_monitors = pmon:monitor(QPid, QMons), + delivering_queues = sets:add_element(QPid, DQ)}. + handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> case rabbit_misc:is_abnormal_termination(Reason) of true -> {MXs, UC1} = dtree:take_all(QPid, UC), @@ -1134,6 +1144,9 @@ handle_consuming_queue_down(QPid, State#ch{consumer_mapping = ConsumerMapping1, queue_consumers = dict:erase(QPid, QCons)}. +handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> + State#ch{delivering_queues = sets:del_element(QPid, DQ)}. + binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, @@ -1269,9 +1282,11 @@ new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), notify_queues(State = #ch{state = closing}) -> {ok, State}; -notify_queues(State = #ch{consumer_mapping = Consumers}) -> - {rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()), - State#ch{state = closing}}. +notify_queues(State = #ch{consumer_mapping = Consumers, + delivering_queues = DQ }) -> + QPids = sets:to_list( + sets:union(sets:from_list(consumer_queues(Consumers)), DQ)), + {rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}. fold_per_queue(_F, Acc, []) -> Acc; diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 9f99f17a0b..2dea2a2f34 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -194,7 +194,11 @@ action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) -> action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), - wait_for_application(Node, PidFile, Inform); + wait_for_application(Node, PidFile, rabbit, Inform); + +action(wait, Node, [PidFile, App], _Opts, Inform) -> + Inform("Waiting for ~p on ~p", [App, Node]), + wait_for_application(Node, PidFile, list_to_atom(App), Inform); action(status, Node, [], _Opts, Inform) -> Inform("Status of node ~p", [Node]), @@ -212,33 +216,33 @@ action(rotate_logs, Node, [], _Opts, Inform) -> Inform("Reopening logs for node ~p", [Node]), call(Node, {rabbit, rotate_logs, [""]}); action(rotate_logs, Node, Args = [Suffix], _Opts, Inform) -> - Inform("Rotating logs to files with suffix ~p", [Suffix]), + Inform("Rotating logs to files with suffix \"~s\"", [Suffix]), call(Node, {rabbit, rotate_logs, Args}); action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) -> - Inform("Closing connection ~s", [PidStr]), + Inform("Closing connection \"~s\"", [PidStr]), rpc_call(Node, rabbit_networking, close_connection, [rabbit_misc:string_to_pid(PidStr), Explanation]); action(add_user, Node, Args = [Username, _Password], _Opts, Inform) -> - Inform("Creating user ~p", [Username]), + Inform("Creating user \"~s\"", [Username]), call(Node, {rabbit_auth_backend_internal, add_user, Args}); action(delete_user, Node, Args = [_Username], _Opts, Inform) -> - Inform("Deleting user ~p", Args), + Inform("Deleting user \"~s\"", Args), call(Node, {rabbit_auth_backend_internal, delete_user, Args}); action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) -> - Inform("Changing password for user ~p", [Username]), + Inform("Changing password for user \"~s\"", [Username]), call(Node, {rabbit_auth_backend_internal, change_password, Args}); action(clear_password, Node, Args = [Username], _Opts, Inform) -> - Inform("Clearing password for user ~p", [Username]), + Inform("Clearing password for user \"~s\"", [Username]), call(Node, {rabbit_auth_backend_internal, clear_password, Args}); action(set_user_tags, Node, [Username | TagsStr], _Opts, Inform) -> Tags = [list_to_atom(T) || T <- TagsStr], - Inform("Setting tags for user ~p to ~p", [Username, Tags]), + Inform("Setting tags for user \"~s\" to ~p", [Username, Tags]), rpc_call(Node, rabbit_auth_backend_internal, set_tags, [list_to_binary(Username), Tags]); @@ -249,11 +253,11 @@ action(list_users, Node, [], _Opts, Inform) -> rabbit_auth_backend_internal:user_info_keys()); action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> - Inform("Creating vhost ~p", Args), + Inform("Creating vhost \"~s\"", Args), call(Node, {rabbit_vhost, add, Args}); action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> - Inform("Deleting vhost ~p", Args), + Inform("Deleting vhost \"~s\"", Args), call(Node, {rabbit_vhost, delete, Args}); action(list_vhosts, Node, Args, _Opts, Inform) -> @@ -315,12 +319,12 @@ action(list_consumers, Node, _Args, Opts, Inform) -> action(trace_on, Node, [], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Starting tracing for vhost ~p", [VHost]), + Inform("Starting tracing for vhost \"~s\"", [VHost]), rpc_call(Node, rabbit_trace, start, [list_to_binary(VHost)]); action(trace_off, Node, [], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Stopping tracing for vhost ~p", [VHost]), + Inform("Stopping tracing for vhost \"~s\"", [VHost]), rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]); action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) -> @@ -333,19 +337,21 @@ action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) -> action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), + Inform("Setting permissions for user \"~s\" in vhost \"~s\"", + [Username, VHost]), call(Node, {rabbit_auth_backend_internal, set_permissions, [Username, VHost, CPerm, WPerm, RPerm]}); action(clear_permissions, Node, [Username], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]), + Inform("Clearing permissions for user \"~s\" in vhost \"~s\"", + [Username, VHost]), call(Node, {rabbit_auth_backend_internal, clear_permissions, [Username, VHost]}); action(list_permissions, Node, [], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Listing permissions in vhost ~p", [VHost]), + Inform("Listing permissions in vhost \"~s\"", [VHost]), display_info_list(call(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]}), rabbit_auth_backend_internal:vhost_perms_info_keys()); @@ -396,17 +402,17 @@ action(eval, Node, [Expr], _Opts, _Inform) -> %%---------------------------------------------------------------------------- -wait_for_application(Node, PidFile, Inform) -> +wait_for_application(Node, PidFile, Application, Inform) -> Pid = read_pid_file(PidFile, true), Inform("pid is ~s", [Pid]), - wait_for_application(Node, Pid). + wait_for_application(Node, Pid, Application). -wait_for_application(Node, Pid) -> +wait_for_application(Node, Pid, Application) -> case process_up(Pid) of - true -> case rabbit:is_running(Node) of + true -> case rabbit_nodes:is_running(Node, Application) of true -> ok; false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), - wait_for_application(Node, Pid) + wait_for_application(Node, Pid, Application) end; false -> {error, process_not_running} end. diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index 831d52901b..b1750b61ad 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -87,9 +87,9 @@ init([Limit]) -> vm_memory_monitor:get_total_memory()} of {N1, N2} when is_integer(N1), is_integer(N2) -> {ok, set_disk_limits(State, Limit)}; - _ -> + Err -> rabbit_log:info("Disabling disk free space monitoring " - "on unsupported platform~n"), + "on unsupported platform: ~p~n", [Err]), {stop, unsupported_platform} end. @@ -167,7 +167,7 @@ get_disk_free(Dir, {unix, Sun}) get_disk_free(Dir, {unix, _}) -> parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir)); get_disk_free(Dir, {win32, _}) -> - parse_free_win32(rabbit_misc:os_cmd("dir /-C /W " ++ Dir)); + parse_free_win32(os:cmd("dir /-C /W \"" ++ Dir ++ [$"])); get_disk_free(_, _) -> unknown. diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 4ec141cfd8..3f1b20fed6 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -139,6 +139,6 @@ notify_if(false, _Type, _Props) -> ok. notify(Type, Props) -> %% TODO: switch to os:timestamp() when we drop support for %% Erlang/OTP < R13B01 - gen_event:notify(rabbit_event, #event{type = Type, - props = Props, - timestamp = now()}). + gen_event:notify(?MODULE, #event{type = Type, + props = Props, + timestamp = now()}). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 9fa6213b94..2b15498ed9 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -192,7 +192,7 @@ terminate(_, _) -> ok. code_change(_, State, _) -> - State. + {ok, State}. %%---------------------------------------------------------------------------- %% Internal plumbing diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 2d155d14e3..17e2ffb472 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -356,7 +356,7 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) -> handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }). -handle_info(send_gm_heartbeat, State = #state{gm = GM}) -> +handle_info(send_gm_heartbeat, State = #state { gm = GM }) -> gm:broadcast(GM, heartbeat), ensure_gm_heartbeat(), noreply(State); diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 04b7514f71..4e71cc43db 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -134,7 +134,7 @@ delete_and_terminate(Reason, State = #state { gm = GM, purge(State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {set_length, 0}), + ok = gm:broadcast(GM, {set_length, 0, false}), {Count, BQS1} = BQ:purge(BQS), {Count, State #state { backing_queue_state = BQS1, set_delivered = 0 }}. @@ -168,19 +168,19 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 })}. -dropwhile(Pred, MsgFun, +dropwhile(Pred, AckRequired, State = #state{gm = GM, backing_queue = BQ, set_delivered = SetDelivered, backing_queue_state = BQS }) -> Len = BQ:len(BQS), - BQS1 = BQ:dropwhile(Pred, MsgFun, BQS), + {Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), Len1 = BQ:len(BQS1), - ok = gm:broadcast(GM, {set_length, Len1}), + ok = gm:broadcast(GM, {set_length, Len1, AckRequired}), Dropped = Len - Len1, SetDelivered1 = lists:max([0, SetDelivered - Dropped]), - State #state { backing_queue_state = BQS1, - set_delivered = SetDelivered1 }. + {Msgs, State #state { backing_queue_state = BQS1, + set_delivered = SetDelivered1 } }. drain_confirmed(State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -246,12 +246,9 @@ ack(AckTags, State = #state { gm = GM, {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. -fold(MsgFun, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS}, AckTags) -> - BQS1 = BQ:fold(MsgFun, BQS, AckTags), - ok = gm:broadcast(GM, {fold, MsgFun, AckTags}), - State #state { backing_queue_state = BQS1 }. +fold(MsgFun, State = #state { backing_queue = BQ, + backing_queue_state = BQS }, AckTags) -> + State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }. requeue(AckTags, State = #state { gm = GM, backing_queue = BQ, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 4b095209da..e412fbbc5d 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -793,23 +793,27 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, {ok, State1 #state { sender_queues = SQ1, msg_id_status = MS1, backing_queue_state = BQS1 }}; -process_instruction({set_length, Length}, +process_instruction({set_length, Length, AckRequired}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), ToDrop = QLen - Length, - {ok, case ToDrop >= 0 of - true -> BQS1 = - lists:foldl( - fun (const, BQSN) -> - {{_Msg, _IsDelivered, _AckTag, _Remaining}, - BQSN1} = BQ:fetch(false, BQSN), - BQSN1 - end, BQS, lists:duplicate(ToDrop, const)), - set_synchronised( - true, State #state { backing_queue_state = BQS1 }); - false -> State - end}; + {ok, + case ToDrop >= 0 of + true -> + State1 = + lists:foldl( + fun (const, StateN = #state {backing_queue_state = BQSN}) -> + {{#basic_message{id = MsgId}, _IsDelivered, AckTag, + _Remaining}, BQSN1} = BQ:fetch(AckRequired, BQSN), + maybe_store_ack( + AckRequired, MsgId, AckTag, + StateN #state { backing_queue_state = BQSN1 }) + end, State, lists:duplicate(ToDrop, const)), + set_synchronised(true, State1); + false -> + State + end}; process_instruction({fetch, AckRequired, MsgId, Remaining}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -835,11 +839,6 @@ process_instruction({ack, MsgIds}, [] = MsgIds1 -- MsgIds, %% ASSERTION {ok, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }}; -process_instruction({fold, MsgFun, AckTags}, - State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> - BQS1 = BQ:fold(MsgFun, BQS, AckTags), - {ok, State #state { backing_queue_state = BQS1 }}; process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index e6a0533547..1a12d43b74 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -18,8 +18,8 @@ -include("rabbit.hrl"). -export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, - recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1, - maybe_fast_close/1, sockname/1, peername/1, peercert/1, + recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2, + close/1, maybe_fast_close/1, sockname/1, peername/1, peercert/1, connection_string/2]). %%--------------------------------------------------------------------------- @@ -34,6 +34,8 @@ -type(ok_val_or_error(A) :: rabbit_types:ok_or_error2(A, any())). -type(ok_or_any_error() :: rabbit_types:ok_or_error(any())). -type(socket() :: port() | #ssl_socket{}). +-type(opts() :: [{atom(), any()} | + {raw, non_neg_integer(), non_neg_integer(), binary()}]). -spec(is_ssl/1 :: (socket()) -> boolean()). -spec(ssl_info/1 :: (socket()) @@ -49,9 +51,12 @@ -spec(async_recv/3 :: (socket(), integer(), timeout()) -> rabbit_types:ok(any())). -spec(port_command/2 :: (socket(), iolist()) -> 'true'). --spec(setopts/2 :: (socket(), [{atom(), any()} | - {raw, non_neg_integer(), non_neg_integer(), - binary()}]) -> ok_or_any_error()). +-spec(getopts/2 :: (socket(), [atom() | {raw, + non_neg_integer(), + non_neg_integer(), + non_neg_integer() | binary()}]) + -> ok_val_or_error(opts())). +-spec(setopts/2 :: (socket(), opts()) -> ok_or_any_error()). -spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()). -spec(close/1 :: (socket()) -> ok_or_any_error()). -spec(maybe_fast_close/1 :: (socket()) -> ok_or_any_error()). @@ -126,6 +131,11 @@ port_command(Sock, Data) when ?IS_SSL(Sock) -> port_command(Sock, Data) when is_port(Sock) -> erlang:port_command(Sock, Data). +getopts(Sock, Options) when ?IS_SSL(Sock) -> + ssl:getopts(Sock#ssl_socket.ssl, Options); +getopts(Sock, Options) when is_port(Sock) -> + inet:getopts(Sock, Options). + setopts(Sock, Options) when ?IS_SSL(Sock) -> ssl:setopts(Sock#ssl_socket.ssl, Options); setopts(Sock, Options) when is_port(Sock) -> diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 28ba469863..1c23632d52 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -16,7 +16,7 @@ -module(rabbit_nodes). --export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0]). +-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, is_running/2]). -define(EPMD_TIMEOUT, 30000). @@ -32,6 +32,7 @@ -spec(make/1 :: ({string(), string()} | string()) -> node()). -spec(parts/1 :: (node() | string()) -> {string(), string()}). -spec(cookie_hash/0 :: () -> string()). +-spec(is_running/2 :: (node(), atom()) -> boolean()). -endif. @@ -91,3 +92,9 @@ parts(NodeStr) -> cookie_hash() -> base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))). + +is_running(Node, Application) -> + case rpc:call(Node, application, which_applications, [infinity]) of + {badrpc, _} -> false; + Apps -> proplists:is_defined(Application, Apps) + end. diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 2a93c8f2f7..00880fb2e2 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -245,7 +245,7 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> {true, true} -> throw({error_string, "Cannot specify -m and -v together"}) end, - OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts), + OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts), OnlyEnabledAll = proplists:get_bool(?ENABLED_ALL_OPT, Opts), AvailablePlugins = find_plugins(PluginsDir), @@ -257,14 +257,10 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> Plugins = [ Plugin || Plugin = #plugin{name = Name} <- AvailablePlugins, re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match, - if OnlyEnabled -> lists:member(Name, EnabledExplicitly); - true -> true - end, - if OnlyEnabledAll -> - lists:member(Name, EnabledImplicitly) or - lists:member(Name, EnabledExplicitly); - true -> - true + if OnlyEnabled -> lists:member(Name, EnabledExplicitly); + OnlyEnabledAll -> (lists:member(Name, EnabledExplicitly) or + lists:member(Name, EnabledImplicitly)); + true -> true end], Plugins1 = usort_plugins(Plugins), MaxWidth = lists:max([length(atom_to_list(Name)) || @@ -338,8 +334,8 @@ read_enabled_plugins(PluginsFile) -> case rabbit_file:read_term_file(PluginsFile) of {ok, [Plugins]} -> Plugins; {ok, []} -> []; - {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file, - PluginsFile}}); + {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file, + PluginsFile}}); {error, enoent} -> []; {error, Reason} -> throw({error, {cannot_read_enabled_plugins_file, PluginsFile, Reason}}) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 5e9e78d380..5acf6acaf6 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -222,6 +222,14 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, last_blocked_by = none, last_blocked_at = never}, try + BufSizes = inet_op(fun () -> + rabbit_net:getopts( + ClientSock, [sndbuf, recbuf, buffer]) + end), + BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]), + ok = inet_op(fun () -> + rabbit_net:setopts(ClientSock, [{buffer, BufSz}]) + end), recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( State, #v1.stats_timer), handshake, 8)), diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index e1c417c410..172cee92db 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -88,7 +88,7 @@ set0(Component, Key, Term) -> mnesia_update(Component, Key, Term) -> rabbit_misc:execute_mnesia_transaction( fun () -> - Res = case mnesia:read(?TABLE, {Component, Key}) of + Res = case mnesia:read(?TABLE, {Component, Key}, read) of [] -> new; [Params] -> {old, Params#runtime_parameters.value} end, @@ -158,7 +158,7 @@ lookup0(Component, Key, DefaultFun) -> lookup_missing(Component, Key, Default) -> rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:read(?TABLE, {Component, Key}) of + case mnesia:read(?TABLE, {Component, Key}, read) of [] -> Record = c(Component, Key, Default), mnesia:write(?TABLE, Record, write), Record; diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a502fa95c3..67abe06efe 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2421,10 +2421,10 @@ test_dropwhile(VQ0) -> fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0), %% drop the first 5 messages - VQ2 = rabbit_variable_queue:dropwhile( - fun(#message_properties { expiry = Expiry }) -> - Expiry =< 5 - end, undefined, VQ1), + {undefined, VQ2} = rabbit_variable_queue:dropwhile( + fun(#message_properties { expiry = Expiry }) -> + Expiry =< 5 + end, false, VQ1), %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> @@ -2441,11 +2441,13 @@ test_dropwhile(VQ0) -> test_dropwhile_varying_ram_duration(VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), - VQ3 = rabbit_variable_queue:dropwhile( - fun(_) -> false end, undefined, VQ2), + {undefined, VQ3} = rabbit_variable_queue:dropwhile( + fun(_) -> false end, false, VQ2), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), - rabbit_variable_queue:dropwhile(fun(_) -> false end, undefined, VQ5). + {undefined, VQ6} = + rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5), + VQ6. test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f9315c5dff..dafb3f2ee7 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,13 +16,12 @@ -module(rabbit_variable_queue). --export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/4, publish_delivered/5, drain_confirmed/1, +-export([init/3, terminate/2, delete_and_terminate/2, purge/1, + publish/4, publish_delivered/5, drain_confirmed/1, dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, - set_ram_duration_target/2, ram_duration/1, - needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, discard/3, - multiple_routing_keys/0, fold/3]). + set_ram_duration_target/2, ram_duration/1, needs_timeout/1, + timeout/1, handle_pre_hibernate/1, status/1, invoke/3, + is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]). -export([start/1, stop/0]). @@ -324,7 +323,6 @@ -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). -type(seq_id() :: non_neg_integer()). --type(ack() :: seq_id()). -type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()}, ingress :: {timestamp(), non_neg_integer()}, @@ -336,6 +334,13 @@ count :: non_neg_integer(), end_seq_id :: non_neg_integer() }). +%% The compiler (rightfully) complains that ack() and state() are +%% unused. For this reason we duplicate a -spec from +%% rabbit_backing_queue with the only intent being to remove +%% warnings. The problem here is that we can't parameterise the BQ +%% behaviour by these two types as we would like to. We still leave +%% these here for documentation purposes. +-type(ack() :: seq_id()). -type(state() :: #vqstate { q1 :: ?QUEUE:?QUEUE(), q2 :: ?QUEUE:?QUEUE(), @@ -369,6 +374,8 @@ ack_out_counter :: non_neg_integer(), ack_in_counter :: non_neg_integer(), ack_rates :: rates() }). +%% Duplicated from rabbit_backing_queue +-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). -spec(multiple_routing_keys/0 :: () -> 'ok'). @@ -579,23 +586,27 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> confirmed = gb_sets:new() }} end. -dropwhile(Pred, MsgFun, State) -> +dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []). + +dropwhile(Pred, AckRequired, State, Msgs) -> + End = fun(S) when AckRequired -> {lists:reverse(Msgs), S}; + (S) -> {undefined, S} + end, case queue_out(State) of {empty, State1} -> - a(State1); + End(a(State1)); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case {Pred(MsgProps), MsgFun} of - {true, undefined} -> - {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile(Pred, MsgFun, State2); - {true, _} -> + case {Pred(MsgProps), AckRequired} of + {true, true} -> {MsgStatus1, State2} = read_msg(MsgStatus, State1), {{Msg, _, AckTag, _}, State3} = internal_fetch(true, MsgStatus1, State2), - MsgFun(Msg, AckTag), - dropwhile(Pred, MsgFun, State3); + dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]); + {true, false} -> + {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile(Pred, AckRequired, State2, undefined); {false, _} -> - a(in_r(MsgStatus, State1)) + End(a(in_r(MsgStatus, State1))) end end. diff --git a/src/supervisor2.erl b/src/supervisor2.erl index f1b748787a..3d3623d752 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -411,6 +411,8 @@ handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) -> #child{mfa = {M, F, A}} = hd(State#state.children), Args = A ++ EArgs, case do_start_child_i(M, F, Args) of + {ok, undefined} -> + {reply, {ok, undefined}, State}; {ok, Pid} -> NState = State#state{dynamics = ?DICT:store(Pid, Args, State#state.dynamics)}, @@ -743,6 +745,8 @@ restart(Strategy, Child, State, Restart) #child{mfa = {M, F, A}} = Child, Dynamics = ?DICT:erase(Child#child.pid, State#state.dynamics), case do_start_child_i(M, F, A) of + {ok, undefined} -> + {ok, State}; {ok, Pid} -> NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)}, {ok, NState}; |
