diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-02-28 12:29:37 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-02-28 12:29:37 +0000 |
| commit | 4435c748294379add22aca9e3ee7273113daee18 (patch) | |
| tree | 260b85749a269e21313ff119a07ed3510a333f68 | |
| parent | 168c060308e2ccdba8ef6f8e06fe2e87400bc3b4 (diff) | |
| parent | 5e78aacf560b504834559e7e2b5a1fc9118983bd (diff) | |
| download | rabbitmq-server-git-4435c748294379add22aca9e3ee7273113daee18.tar.gz | |
merge bug24650 into default (Move names for connections and channels from mgmt to broker)
| -rw-r--r-- | Makefile | 2 | ||||
| -rw-r--r-- | docs/rabbitmq-echopid.xml | 71 | ||||
| -rw-r--r-- | docs/rabbitmqctl.1.xml | 7 | ||||
| -rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
| -rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 8 | ||||
| -rw-r--r-- | packaging/debs/apt-repository/README-real-repository | 4 | ||||
| -rw-r--r-- | packaging/macports/Portfile.in | 9 | ||||
| -rw-r--r-- | scripts/rabbitmq-echopid.bat | 49 | ||||
| -rw-r--r-- | src/rabbit.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 349 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_ssl.erl | 41 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 43 |
20 files changed, 646 insertions, 113 deletions
@@ -17,7 +17,7 @@ BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS) plugins WEB_URL=http://www.rabbitmq.com/ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml)) -WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml) +WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml $(DOCS_DIR)/rabbitmq-echopid.xml) USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-plugins.1.xml USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML))) QC_MODULES := rabbit_backing_queue_qc diff --git a/docs/rabbitmq-echopid.xml b/docs/rabbitmq-echopid.xml new file mode 100644 index 0000000000..d3dcea521b --- /dev/null +++ b/docs/rabbitmq-echopid.xml @@ -0,0 +1,71 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd"> +<refentry lang="en"> + <refentryinfo> + <productname>RabbitMQ Server</productname> + <authorgroup> + <corpauthor>The RabbitMQ Team <<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>></corpauthor> + </authorgroup> + </refentryinfo> + + <refmeta> + <refentrytitle>rabbitmq-echopid.bat</refentrytitle> + <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo> + </refmeta> + + <refnamediv> + <refname>rabbitmq-echopid.bat</refname> + <refpurpose>return the process id of the Erlang runtime hosting RabbitMQ</refpurpose> + </refnamediv> + + <refsynopsisdiv> + <cmdsynopsis> + <command>rabbitmq-echopid.bat</command> + <arg choice="req">sname</arg> + </cmdsynopsis> + </refsynopsisdiv> + + <refsect1> + <title>Description</title> + <para> + RabbitMQ is an implementation of AMQP, the emerging + standard for high performance enterprise messaging. The + RabbitMQ server is a robust and scalable implementation of + an AMQP broker. + </para> + <para> + Running <command>rabbitmq-echopid</command> will attempt to + discover and echo the process id (PID) of the Erlang runtime + process (erl.exe) that is hosting RabbitMQ. To allow erl.exe + time to start up and load RabbitMQ, the script will wait for + ten seconds before timing out if a suitable PID cannot be + found. + </para> + <para> + If a PID is discovered, the script will echo it to stdout + before exiting with a ERRORLEVEL of 0. If no PID is + discovered before the timeout, nothing is written to stdout + and the script exits setting ERRORLEVEL to 1. + </para> + <para> + Note that this script only exists on Windows due to the need + to wait for erl.exe and possibly time-out. To obtain the PID + on Unix set RABBITMQ_PID_FILE before starting + rabbitmq-server and do not use "-detached". + </para> + </refsect1> + + <refsect1> + <title>Options</title> + <variablelist> + <varlistentry> + <term><cmdsynopsis><arg choice="req">sname</arg></cmdsynopsis></term> + <listitem> + <para role="usage"> +The short-name form of the RabbitMQ node name. + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect1> +</refentry> diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index b531d46218..ee07ebfea9 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1428,8 +1428,11 @@ <variablelist> <varlistentry> <term>fraction</term> - <listitem><para>The new memory threshhold fraction at which flow control is triggered, as a - floating point number between 0.0 and 1.0 with a mandatory fractional part.</para></listitem> + <listitem><para> + The new memory threshold fraction at which flow + control is triggered, as a floating point number + greater than or equal to 0. + </para></listitem> </varlistentry> </variablelist> </listitem> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 2fee1114aa..fd19051d27 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -38,6 +38,7 @@ {delegate_count, 16}, {trace_vhosts, []}, {log_levels, [{connection, info}]}, + {ssl_cert_login_from, distinguished_name}, {tcp_listen_options, [binary, {packet, raw}, {reuseaddr, true}, diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 2a8cc13c92..bf93baba5d 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -25,6 +25,8 @@ -type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). -type(duration() :: ('undefined' | 'infinity' | number())). +-type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok')). + -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(init/3 :: (rabbit_types:amqqueue(), attempt_recovery(), @@ -42,12 +44,14 @@ rabbit_types:message_properties(), pid(), state()) -> {undefined, state()}). -spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}). --spec(dropwhile/2 :: - (fun ((rabbit_types:message_properties()) -> boolean()), state()) +-spec(dropwhile/3 :: + (fun ((rabbit_types:message_properties()) -> boolean()), + msg_fun() | 'undefined', state()) -> state()). -spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()}; (false, state()) -> {fetch_result(undefined), state()}). -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). +-spec(fold/3 :: (msg_fun(), state(), [ack()]) -> state()). -spec(requeue/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). -spec(len/1 :: (state()) -> non_neg_integer()). diff --git a/packaging/debs/apt-repository/README-real-repository b/packaging/debs/apt-repository/README-real-repository index b152622731..189852eb60 100644 --- a/packaging/debs/apt-repository/README-real-repository +++ b/packaging/debs/apt-repository/README-real-repository @@ -13,7 +13,7 @@ that's a baby rabbit. Secondly, a note on software. We need a tool to manage the repository, and a tool to perform uploads to the repository. Debian being Debian -there are quite a few of each. We will use "rerepro" to manage the +there are quite a few of each. We will use "reprepro" to manage the repository since it's modern, maintained, and fairly simple. We will use "dupload" to perform the uploads since it gives us the ability to run arbitrary commands after the upload, which means we don't need to run a @@ -99,7 +99,7 @@ machine * "rm -rf" the uploads folder This is a bit cheesy but should be enough for our purposes. The -dupload.conf uses scp and ssh so you need a public-key login (or tpye +dupload.conf uses scp and ssh so you need a public-key login (or type your password lots). There's still an open question as to whether dupload is really needed diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index 360fb394eb..cde02b1aa9 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -48,6 +48,7 @@ set serveruser rabbitmq set servergroup rabbitmq set serverhome ${prefix}/var/lib/rabbitmq set logdir ${prefix}/var/log/rabbitmq +set confdir ${prefix}/etc/rabbitmq set mnesiadbdir ${prefix}/var/lib/rabbitmq/mnesia set plistloc ${prefix}/etc/LaunchDaemons/org.macports.rabbitmq-server set sbindir ${destroot}${prefix}/lib/rabbitmq/bin @@ -74,6 +75,7 @@ destroot.destdir \ MAN_DIR=${destroot}${prefix}/share/man destroot.keepdirs \ + ${destroot}${confdir} \ ${destroot}${logdir} \ ${destroot}${mnesiadbdir} @@ -83,17 +85,16 @@ pre-destroot { } post-destroot { + xinstall -d -m 775 ${destroot}${confdir} xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${logdir} xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${serverhome} xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir} - reinplace -E "s:(/etc/rabbitmq/rabbitmq):${prefix}\\1:g" \ + reinplace -E "s: (/etc/rabbitmq/rabbitmq): ${prefix}\\1:g" \ ${realsbin}/rabbitmq-env foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE ENABLED_PLUGINS_FILE} { reinplace -E "s:^($var)=/:\\1=${prefix}/:" \ - ${realsbin}/rabbitmq-server \ - ${realsbin}/rabbitmqctl \ - ${realsbin}/rabbitmq-plugins + ${realsbin}/rabbitmq-env } xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \ diff --git a/scripts/rabbitmq-echopid.bat b/scripts/rabbitmq-echopid.bat new file mode 100644 index 0000000000..5c652c30c0 --- /dev/null +++ b/scripts/rabbitmq-echopid.bat @@ -0,0 +1,49 @@ +@echo off + +REM Usage: rabbitmq-echopid.bat <rabbitmq_nodename> +REM +REM <rabbitmq_nodename> sname of the erlang node to connect to (required) + +setlocal + +if "%1"=="" goto fail + +:: set timeout vars :: +set TIMEOUT=10 +set TIMER=1 + +:: check that wmic exists :: +set WMIC_PATH=%SYSTEMROOT%\System32\Wbem\wmic.exe +if not exist "%WMIC_PATH%" ( + goto fail +) + +:getpid +for /f "usebackq tokens=* skip=1" %%P IN (`%%WMIC_PATH%% process where "name='erl.exe' and commandline like '%%-sname %1%%'" get processid 2^>nul`) do ( + set PID=%%P + goto echopid +) + +:echopid +:: check for pid not found :: +if "%PID%" == "" ( + PING 127.0.0.1 -n 2 > nul + set /a TIMER+=1 + if %TIMEOUT%==%TIMER% goto fail + goto getpid +) + +:: show pid :: +echo %PID% + +:: all done :: +:ok +endlocal +EXIT /B 0 + +:: something went wrong :: +:fail +endlocal +EXIT /B 1 + + diff --git a/src/rabbit.erl b/src/rabbit.erl index 0a0ca90a63..dd5fb89ce4 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -212,14 +212,13 @@ -type(file_suffix() :: binary()). %% this really should be an abstract type -type(log_location() :: 'tty' | 'undefined' | file:filename()). +-type(param() :: atom()). -spec(maybe_hipe_compile/0 :: () -> 'ok'). -spec(prepare/0 :: () -> 'ok'). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_halt/0 :: () -> no_return()). --spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())). --spec(force_event_refresh/0 :: () -> 'ok'). -spec(status/0 :: () -> [{pid, integer()} | {running_applications, [{atom(), string(), string()}]} | @@ -228,12 +227,11 @@ {memory, any()}]). -spec(is_running/0 :: () -> boolean()). -spec(is_running/1 :: (node()) -> boolean()). --spec(environment/0 :: () -> [{atom() | term()}]). --spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()). +-spec(environment/0 :: () -> [{param() | term()}]). +-spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())). +-spec(force_event_refresh/0 :: () -> 'ok'). --spec(maybe_insert_default_data/0 :: () -> 'ok'). --spec(boot_delegate/0 :: () -> 'ok'). --spec(recover/0 :: () -> 'ok'). +-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()). -spec(start/2 :: ('normal',[]) -> {'error', @@ -243,6 +241,10 @@ {'ok',pid()}). -spec(stop/1 :: (_) -> 'ok'). +-spec(maybe_insert_default_data/0 :: () -> 'ok'). +-spec(boot_delegate/0 :: () -> 'ok'). +-spec(recover/0 :: () -> 'ok'). + -endif. %%---------------------------------------------------------------------------- @@ -712,6 +714,6 @@ config_files() -> case init:get_argument(config) of {ok, Files} -> [filename:absname( filename:rootname(File, ".config") ++ ".config") || - File <- Files]; + [File] <- Files]; error -> [] end. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a7dfd535c8..c95efa1447 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -334,12 +334,23 @@ check_declare_arguments(QueueName, Args) -> precondition_failed, "invalid arg '~s' for ~s: ~255p", [Key, rabbit_misc:rs(QueueName), Error]) - end || {Key, Fun} <- + end || + {Key, Fun} <- [{<<"x-expires">>, fun check_integer_argument/2}, {<<"x-message-ttl">>, fun check_integer_argument/2}, - {<<"x-ha-policy">>, fun check_ha_policy_argument/2}]], + {<<"x-ha-policy">>, fun check_ha_policy_argument/2}, + {<<"x-dead-letter-exchange">>, fun check_string_argument/2}, + {<<"x-dead-letter-routing-key">>, + fun check_dlxrk_argument/2}]], ok. +check_string_argument(undefined, _Args) -> + ok; +check_string_argument({longstr, _}, _Args) -> + ok; +check_string_argument({Type, _}, _) -> + {error, {unacceptable_type, Type}}. + check_integer_argument(undefined, _Args) -> ok; check_integer_argument({Type, Val}, _Args) when Val > 0 -> @@ -350,6 +361,16 @@ check_integer_argument({Type, Val}, _Args) when Val > 0 -> check_integer_argument({_Type, Val}, _Args) -> {error, {value_zero_or_less, Val}}. +check_dlxrk_argument(undefined, _Args) -> + ok; +check_dlxrk_argument({longstr, _}, Args) -> + case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of + undefined -> {error, routing_key_but_no_dlx_defined}; + _ -> ok + end; +check_dlxrk_argument({Type, _}, _Args) -> + {error, {unacceptable_type, Type}}. + check_ha_policy_argument(undefined, _Args) -> ok; check_ha_policy_argument({longstr, <<"all">>}, _Args) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 12cd0c93ff..fd2d7214d2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -49,7 +49,14 @@ stats_timer, msg_id_to_channel, ttl, - ttl_timer_ref + ttl_timer_ref, + publish_seqno, + unconfirmed_mq, + unconfirmed_qm, + delayed_stop, + queue_monitors, + dlx, + dlx_routing_key }). -record(consumer, {tag, ack_required}). @@ -128,6 +135,13 @@ init(Q) -> rate_timer_ref = undefined, expiry_timer_ref = undefined, ttl = undefined, + dlx = undefined, + dlx_routing_key = undefined, + publish_seqno = 1, + unconfirmed_mq = gb_trees:empty(), + unconfirmed_qm = gb_trees:empty(), + delayed_stop = undefined, + queue_monitors = dict:new(), msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -149,6 +163,11 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, rate_timer_ref = RateTRef, expiry_timer_ref = undefined, ttl = undefined, + publish_seqno = 1, + unconfirmed_mq = gb_trees:empty(), + unconfirmed_qm = gb_trees:empty(), + delayed_stop = undefined, + queue_monitors = dict:new(), msg_id_to_channel = MTC}, State1 = requeue_and_run(AckTags, process_args( rabbit_event:init_stats_timer( @@ -210,18 +229,28 @@ bq_init(BQ, Q, Recover) -> end). process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> - lists:foldl(fun({Arg, Fun}, State1) -> - case rabbit_misc:table_lookup(Arguments, Arg) of - {_Type, Val} -> Fun(Val, State1); - undefined -> State1 - end - end, State, [{<<"x-expires">>, fun init_expires/2}, - {<<"x-message-ttl">>, fun init_ttl/2}]). + lists:foldl( + fun({Arg, Fun}, State1) -> + case rabbit_misc:table_lookup(Arguments, Arg) of + {_Type, Val} -> Fun(Val, State1); + undefined -> State1 + end + end, State, + [{<<"x-expires">>, fun init_expires/2}, + {<<"x-message-ttl">>, fun init_ttl/2}, + {<<"x-dead-letter-exchange">>, fun init_dlx/2}, + {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}). +init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) -> + State#q{dlx = rabbit_misc:r(QName, exchange, DLX)}. + +init_dlx_routing_key(RoutingKey, State) -> + State#q{dlx_routing_key = RoutingKey}. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), @@ -449,34 +478,36 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> lists:foldl( fun(MsgId, {CMs, MTC0}) -> case gb_trees:lookup(MsgId, MTC0) of - {value, {ChPid, MsgSeqNo}} -> - {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMs), + {value, {SenderPid, MsgSeqNo}} -> + {rabbit_misc:gb_trees_cons(SenderPid, + MsgSeqNo, CMs), gb_trees:delete(MsgId, MTC0)}; none -> {CMs, MTC0} end end, {gb_trees:empty(), MTC}, MsgIds), - rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs), + rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), State#q{msg_id_to_channel = MTC1}. should_confirm_message(#delivery{msg_seq_no = undefined}, _State) -> never; -should_confirm_message(#delivery{sender = ChPid, +should_confirm_message(#delivery{sender = SenderPid, msg_seq_no = MsgSeqNo, message = #basic_message { is_persistent = true, id = MsgId}}, #q{q = #amqqueue{durable = true}}) -> - {eventually, ChPid, MsgSeqNo, MsgId}; + {eventually, SenderPid, MsgSeqNo, MsgId}; should_confirm_message(_Delivery, _State) -> immediately. needs_confirming({eventually, _, _, _}) -> true; needs_confirming(_) -> false. -maybe_record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId}, +maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId}, State = #q{msg_id_to_channel = MTC}) -> - State#q{msg_id_to_channel = gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC)}; + State#q{msg_id_to_channel = + gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)}; maybe_record_confirm_message(_Confirm, State) -> State. @@ -488,13 +519,13 @@ run_message_queue(State) -> BQ:is_empty(BQS), State1), State2. -attempt_delivery(Delivery = #delivery{sender = ChPid, +attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message, msg_seq_no = MsgSeqNo}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Confirm = should_confirm_message(Delivery, State), case Confirm of - immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); + immediately -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]); _ -> ok end, case BQ:is_duplicate(Message, BQS) of @@ -509,7 +540,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid, AckRequired, Message, (?BASE_MESSAGE_PROPERTIES)#message_properties{ needs_confirming = needs_confirming(Confirm)}, - ChPid, BQS2), + SenderPid, BQS2), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS3}} end, @@ -530,7 +561,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid, end. deliver_or_enqueue(Delivery = #delivery{message = Message, - sender = ChPid}, State) -> + sender = SenderPid}, State) -> {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = maybe_record_confirm_message(Confirm, State1), @@ -538,7 +569,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, true -> State2; false -> Props = (message_properties(State)) #message_properties{ needs_confirming = needs_confirming(Confirm)}, - BQS1 = BQ:publish(Message, Props, ChPid, BQS), + BQS1 = BQ:publish(Message, Props, SenderPid, BQS), ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. @@ -659,11 +690,11 @@ subtract_acks(ChPid, AckTags, State, Fun) -> Fun(State) end. -discard_delivery(#delivery{sender = ChPid, +discard_delivery(#delivery{sender = SenderPid, message = Message}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}. + State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}. message_properties(#q{ttl=TTL}) -> #message_properties{expiry = calculate_msg_expiry(TTL)}. @@ -674,10 +705,11 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> State; drop_expired_messages(State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> + backing_queue = BQ}) -> Now = now_micros(), BQS1 = BQ:dropwhile( fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, + dead_letter_fun(expired, State), BQS), ensure_ttl_timer(State#q{backing_queue_state = BQS1}). @@ -694,6 +726,199 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. +dead_letter_fun(_Reason, #q{dlx = undefined}) -> + undefined; +dead_letter_fun(Reason, _State) -> + fun(Msg, AckTag) -> + gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}) + end. + +dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> + case rabbit_exchange:lookup(DLX) of + {error, not_found} -> + noreply(State); + _ -> + dead_letter_msg_existing_dlx(Msg, AckTag, Reason, State) + end. + +dead_letter_msg_existing_dlx(Msg, AckTag, Reason, + State = #q{publish_seqno = MsgSeqNo, + unconfirmed_mq = UMQ, + dlx = DLX, + backing_queue = BQ, + backing_queue_state = BQS}) -> + {ok, _, QPids} = + rabbit_basic:publish( + rabbit_basic:delivery( + false, false, make_dead_letter_msg(DLX, Reason, Msg, State), + MsgSeqNo)), + State1 = lists:foldl(fun monitor_queue/2, State, QPids), + State2 = State1#q{publish_seqno = MsgSeqNo + 1}, + case QPids of + [] -> {_, BQS1} = BQ:ack([AckTag], BQS), + cleanup_after_confirm(State2#q{backing_queue_state = BQS1}); + _ -> State3 = + lists:foldl( + fun(QPid, State0 = #q{unconfirmed_qm = UQM}) -> + UQM1 = rabbit_misc:gb_trees_set_insert( + QPid, MsgSeqNo, UQM), + State0#q{unconfirmed_qm = UQM1} + end, State2, QPids), + noreply(State3#q{ + unconfirmed_mq = + gb_trees:insert( + MsgSeqNo, {gb_sets:from_list(QPids), + AckTag}, UMQ)}) + end. + +monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> + case dict:is_key(QPid, QMons) of + true -> State; + false -> State#q{queue_monitors = + dict:store(QPid, erlang:monitor(process, QPid), + QMons)} + end. + +demonitor_queue(QPid, State = #q{queue_monitors = QMons}) -> + case dict:find(QPid, QMons) of + {ok, MRef} -> erlang:demonitor(MRef), + State#q{queue_monitors = dict:erase(QPid, QMons)}; + error -> State + end. + +handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, + unconfirmed_qm = UQM}) -> + case dict:find(QPid, QMons) of + error -> + noreply(State); + {ok, _} -> + #resource{name = QName} = qname(State), + rabbit_log:info("DLQ ~p (for ~p) died~n", [QPid, QName]), + case gb_trees:lookup(QPid, UQM) of + none -> + noreply(State); + {value, MsgSeqNosSet} -> + case rabbit_misc:is_abnormal_termination(Reason) of + true -> rabbit_log:warning( + "Dead queue lost ~p messages~n", + [gb_sets:size(MsgSeqNosSet)]); + false -> ok + end, + handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, + State#q{queue_monitors = + dict:erase(QPid, QMons)}) + end + end. + +handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ, + unconfirmed_qm = UQM, + backing_queue = BQ, + backing_queue_state = BQS}) -> + {AckTags1, UMQ3} = + lists:foldl( + fun (MsgSeqNo, {AckTags, UMQ1}) -> + {QPids, AckTag} = gb_trees:get(MsgSeqNo, UMQ1), + QPids1 = gb_sets:delete(QPid, QPids), + case gb_sets:is_empty(QPids1) of + true -> {[AckTag | AckTags], + gb_trees:delete(MsgSeqNo, UMQ1)}; + false -> {AckTags, gb_trees:update( + MsgSeqNo, {QPids1, AckTag}, UMQ1)} + end + end, {[], UMQ}, MsgSeqNos), + {_Guids, BQS1} = BQ:ack(AckTags1, BQS), + MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM), + gb_sets:from_list(MsgSeqNos)), + State1 = case gb_sets:is_empty(MsgSeqNos1) of + false -> State#q{ + unconfirmed_qm = + gb_trees:update(QPid, MsgSeqNos1, UQM)}; + true -> demonitor_queue( + QPid, State#q{ + unconfirmed_qm = + gb_trees:delete(QPid, UQM)}) + end, + cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3, + backing_queue_state = BQS1}). + +stop_later(Reason, State) -> + stop_later(Reason, undefined, noreply, State). + +stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) -> + case {gb_trees:is_empty(UMQ), Reply} of + {true, noreply} -> + {stop, Reason, State}; + {true, _} -> + {stop, Reason, Reply, State}; + {false, _} -> + noreply(State#q{delayed_stop = {Reason, {From, Reply}}}) + end. + +cleanup_after_confirm(State = #q{delayed_stop = DS, + unconfirmed_mq = UMQ}) -> + case gb_trees:is_empty(UMQ) andalso DS =/= undefined of + true -> case DS of + {_, {_, noreply}} -> ok; + {_, {From, Reply}} -> gen_server2:reply(From, Reply) + end, + {Reason, _} = DS, + {stop, Reason, State}; + false -> noreply(State) + end. + +already_been_here(_Delivery, #q{dlx = undefined}) -> + false; +already_been_here(#delivery{message = #basic_message{content = Content}}, + State) -> + #content{properties = #'P_basic'{headers = Headers}} = + rabbit_binary_parser:ensure_content_decoded(Content), + #resource{name = QueueName} = qname(State), + case Headers of + undefined -> + false; + _ -> + case rabbit_misc:table_lookup(Headers, <<"x-death">>) of + {array, DeathTables} -> + OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || + {table, D} <- DeathTables], + OldQueues1 = [QName || {longstr, QName} <- OldQueues], + case lists:member(QueueName, OldQueues1) of + true -> [QueueName | OldQueues1]; + _ -> false + end; + _ -> + false + end + end. + +make_dead_letter_msg(DLX, Reason, + Msg = #basic_message{content = Content, + exchange_name = Exchange, + routing_keys = RoutingKeys}, + State = #q{dlx_routing_key = DlxRoutingKey}) -> + Headers = rabbit_basic:extract_headers(Content), + #resource{name = QName} = qname(State), + %% The first routing key is the one specified in the + %% basic.publish; all others are CC or BCC keys. + RoutingKeys1 = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], + Info = [{<<"reason">>, longstr, list_to_binary(atom_to_list(Reason))}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000}, + {<<"exchange">>, longstr, Exchange#resource.name}, + {<<"routing-keys">>, array, + [{longstr, Key} || Key <- RoutingKeys1]}], + Headers1 = rabbit_basic:append_table_header(<<"x-death">>, Info, Headers), + {DeathRoutingKeys, Headers2} = + case DlxRoutingKey of + undefined -> {RoutingKeys, Headers1}; + _ -> {[DlxRoutingKey], + lists:keydelete(<<"CC">>, 1, Headers1)} + end, + Content1 = rabbit_basic:replace_headers(Headers2, Content), + Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(), + routing_keys = DeathRoutingKeys, content = Content1}. + + now_micros() -> timer:now_diff(now(), {0,0,0}). infos(Items, State) -> @@ -835,6 +1060,9 @@ prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> _ -> 0 end. +handle_call(_, _, State = #q{delayed_stop = DS}) when DS =/= undefined -> + noreply(State); + handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> declare(Recover, From, State); @@ -891,15 +1119,15 @@ handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) -> gen_server2:reply(From, true), noreply(deliver_or_enqueue(Delivery, State)); -handle_call({notify_down, ChPid}, _From, State) -> +handle_call({notify_down, ChPid}, From, State) -> %% we want to do this synchronously, so that auto_deleted queues %% are no longer visible by the time we send a response to the %% client. The queue is ultimately deleted in terminate/2; if we %% return stop with a reply, terminate/2 will be called by %% gen_server2 *before* the reply is sent. case handle_ch_down(ChPid, State) of - {ok, NewState} -> reply(ok, NewState); - {stop, NewState} -> {stop, normal, ok, NewState} + {ok, State1} -> reply(ok, State1); + {stop, State1} -> stop_later(normal, From, ok, State1) end; handle_call({basic_get, ChPid, NoAck}, _From, @@ -954,7 +1182,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, reply(ok, State2) end; -handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, +handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, State = #q{exclusive_consumer = Holder}) -> ok = maybe_send_reply(ChPid, OkMsg), case lookup_ch(ChPid) of @@ -974,7 +1202,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, State#q.active_consumers)}, case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); - true -> {stop, normal, ok, State1} + true -> stop_later(normal, From, ok, State1) end end; @@ -983,20 +1211,18 @@ handle_call(stat, _From, State) -> drop_expired_messages(ensure_expiry_timer(State)), reply({ok, BQ:len(BQS), active_consumer_count()}, State1); -handle_call({delete, IfUnused, IfEmpty}, _From, +handle_call({delete, IfUnused, IfEmpty}, From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> IsEmpty = BQ:is_empty(BQS), IsUnused = is_unused(State), if - IfEmpty and not(IsEmpty) -> - reply({error, not_empty}, State); - IfUnused and not(IsUnused) -> - reply({error, in_use}, State); - true -> - {stop, normal, {ok, BQ:len(BQS)}, State} + IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); + IfUnused and not(IsUnused) -> reply({error, in_use}, State); + true -> stop_later(normal, From, + {ok, BQ:len(BQS)}, State) end; -handle_call(purge, _From, State = #q{backing_queue = BQ, +handle_call(purge, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Count, BQS1} = BQ:purge(BQS), reply({ok, Count}, State#q{backing_queue_state = BQS1}); @@ -1007,10 +1233,18 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> ChPid, AckTags, State, fun (State1) -> requeue_and_run(AckTags, State1) end)). +handle_cast({confirm, MsgSeqNos, QPid}, State) -> + handle_confirm(MsgSeqNos, QPid, State); + +handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> + noreply(State); + handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender, + msg_seq_no = MsgSeqNo}, Flow}, + State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. case Flow of flow -> Key = {ch_publisher, Sender}, @@ -1021,7 +1255,12 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> credit_flow:ack(Sender); noflow -> ok end, - noreply(deliver_or_enqueue(Delivery, State)); + case already_been_here(Delivery, State) of + false -> noreply(deliver_or_enqueue(Delivery, State)); + Qs -> log_cycle_once(Qs), + rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]), + noreply(State) + end; handle_cast({ack, AckTags, ChPid}, State) -> noreply(subtract_acks( @@ -1039,13 +1278,14 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> backing_queue_state = BQS}) -> case Requeue of true -> requeue_and_run(AckTags, State1); - false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), + false -> Fun = dead_letter_fun(rejected, State), + BQS1 = BQ:fold(Fun, BQS, AckTags), State1#q{backing_queue_state = BQS1} end end)); handle_cast(delete_immediately, State) -> - {stop, normal, State}; + stop_later(normal, State); handle_cast({unblock, ChPid}, State) -> noreply( @@ -1096,11 +1336,17 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) -> {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State), emit_consumer_created(Ch, CTag, true, AckRequired) end, - noreply(State). + noreply(State); + +handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) -> + dead_letter_msg(Msg, AckTag, Reason, State). + +handle_info(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> + noreply(State); handle_info(maybe_expire, State) -> case is_unused(State) of - true -> {stop, normal, State}; + true -> stop_later(normal, State); false -> noreply(ensure_expiry_timer(State)) end; @@ -1122,11 +1368,11 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, %% match what people expect (see bug 21824). However we need this %% monitor-and-async- delete in case the connection goes away %% unexpectedly. - {stop, normal, State}; -handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> + stop_later(normal, State); +handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) -> case handle_ch_down(DownPid, State) of - {ok, NewState} -> noreply(NewState); - {stop, NewState} -> {stop, normal, NewState} + {ok, State1} -> handle_queue_down(DownPid, Reason, State1); + {stop, State1} -> stop_later(normal, State1) end; handle_info(update_ram_duration, State = #q{backing_queue = BQ, @@ -1171,3 +1417,14 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, {hibernate, stop_rate_timer(State1)}. format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). + +log_cycle_once(Queues) -> + Key = {queue_cycle, Queues}, + case get(Key) of + true -> ok; + undefined -> rabbit_log:warning( + "Message dropped. Dead-letter queues cycle detected" ++ + ": ~p~nThis cycle will NOT be reported again.~n", + [Queues]), + put(Key, true) + end. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 364eb8f646..42627aae7e 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -95,16 +95,24 @@ behaviour_info(callbacks) -> {drain_confirmed, 1}, %% Drop messages from the head of the queue while the supplied - %% predicate returns true. - {dropwhile, 2}, + %% predicate returns true. A callback function is supplied + %% allowing callers access to messages that are about to be + %% dropped. + {dropwhile, 3}, %% Produce the next message. {fetch, 2}, %% Acktags supplied are for messages which can now be forgotten - %% about. Must return 1 msg_id per Ack, in the same order as Acks. + %% about. Must return 1 msg_id per Ack, in the same order as + %% Acks. {ack, 2}, + %% Acktags supplied are for messages which should be + %% processed. The provided callback function is called with each + %% message. + {fold, 3}, + %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. {requeue, 2}, diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index b8211d4332..25485ca0b0 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -19,7 +19,8 @@ -include("rabbit_framing.hrl"). -export([publish/4, publish/6, publish/1, - message/3, message/4, properties/1, delivery/4]). + message/3, message/4, properties/1, append_table_header/3, + extract_headers/1, replace_headers/2, delivery/4, header_routes/1]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -31,6 +32,7 @@ -type(publish_result() :: ({ok, rabbit_amqqueue:routing_result(), [pid()]} | rabbit_types:error('not_found'))). +-type(headers() :: rabbit_framing:amqp_table() | 'undefined'). -type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())). -type(body_input() :: (binary() | [binary()])). @@ -55,6 +57,17 @@ rabbit_types:ok_or_error2(rabbit_types:message(), any())). -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). + +-spec(append_table_header/3 :: + (binary(), rabbit_framing:amqp_table(), headers()) -> headers()). + +-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()). + +-spec(replace_headers/2 :: (headers(), rabbit_types:content()) + -> rabbit_types:content()). + +-spec(header_routes/1 :: + (undefined | rabbit_framing:amqp_table()) -> [string()]). -spec(build_content/2 :: (rabbit_framing:amqp_property_record(), binary() | [binary()]) -> rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> @@ -166,6 +179,24 @@ properties(P) when is_list(P) -> end end, #'P_basic'{}, P). +append_table_header(Name, Info, undefined) -> + append_table_header(Name, Info, []); +append_table_header(Name, Info, Headers) -> + Prior = case rabbit_misc:table_lookup(Headers, Name) of + undefined -> []; + {array, Existing} -> Existing + end, + rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]). + +extract_headers(Content) -> + #content{properties = #'P_basic'{headers = Headers}} = + rabbit_binary_parser:ensure_content_decoded(Content), + Headers. + +replace_headers(Headers, Content = #content{properties = Props}) -> + rabbit_binary_generator:clear_encoded_content( + Content#content{properties = Props#'P_basic'{headers = Headers}}). + indexof(L, Element) -> indexof(L, Element, 1). indexof([], _Element, _N) -> 0; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 24cfbcf890..d5cba91b2e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -21,7 +21,7 @@ -behaviour(gen_server2). -export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, flushed/2, confirm/2]). +-export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). -export([force_event_refresh/0]). @@ -89,7 +89,6 @@ (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). --spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(list_local/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). @@ -136,9 +135,6 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). -confirm(Pid, MsgSeqNos) -> - gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). - list() -> rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(), rabbit_channel, list_local, []). @@ -1166,14 +1162,9 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> %% the set one by one which which would be inefficient State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, {Nack, SendFun} = - case Reason of - Reason when Reason =:= noproc; Reason =:= noconnection; - Reason =:= normal; Reason =:= shutdown -> - {false, fun record_confirms/2}; - {shutdown, _} -> - {false, fun record_confirms/2}; - _ -> - {true, fun send_nacks/2} + case rabbit_misc:is_abnormal_termination(Reason) of + true -> {true, fun send_nacks/2}; + false -> {false, fun record_confirms/2} end, {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), SendFun(MXs, State2). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 64a4a7371e..bfdab487f3 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -18,10 +18,10 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, - requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/2, + requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, discard/3]). + status/1, invoke/3, is_duplicate/2, discard/3, fold/3]). -export([start/1, stop/0]). @@ -172,12 +172,13 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 })}. -dropwhile(Fun, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - set_delivered = SetDelivered }) -> +dropwhile(Pred, MsgFun, + State = #state{gm = GM, + backing_queue = BQ, + set_delivered = SetDelivered, + backing_queue_state = BQS }) -> Len = BQ:len(BQS), - BQS1 = BQ:dropwhile(Fun, BQS), + BQS1 = BQ:dropwhile(Pred, MsgFun, BQS), Dropped = Len - BQ:len(BQS1), SetDelivered1 = lists:max([0, SetDelivered - Dropped]), ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}), @@ -248,6 +249,13 @@ ack(AckTags, State = #state { gm = GM, {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. +fold(MsgFun, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS}, AckTags) -> + BQS1 = BQ:fold(MsgFun, BQS, AckTags), + ok = gm:broadcast(GM, {fold, MsgFun, AckTags}), + State #state { backing_queue_state = BQS1 }. + requeue(AckTags, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 9bf89bce3f..98a80a2619 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -430,7 +430,7 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> Acc end end, {gb_trees:empty(), MS}, MsgIds), - rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs), + rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), State #state { msg_id_status = MS1 }. handle_process_result({ok, State}) -> noreply(State); @@ -665,7 +665,7 @@ maybe_enqueue_message( {ok, {confirmed, ChPid}} -> %% BQ has confirmed it but we didn't know what the %% msg_seq_no was at the time. We do now! - ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { sender_queues = SQ1, msg_id_status = dict:erase(MsgId, MS) }; @@ -682,7 +682,7 @@ maybe_enqueue_message( msg_id_status = dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) }; immediately -> - ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { msg_id_status = dict:erase(MsgId, MS), sender_queues = SQ1 } @@ -744,7 +744,7 @@ process_instruction( {MQ2, PendingCh, dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)}; immediately -> - ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), {MQ2, PendingCh, MS} end; {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> @@ -843,6 +843,11 @@ process_instruction({ack, MsgIds}, [] = MsgIds1 -- MsgIds, %% ASSERTION {ok, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }}; +process_instruction({fold, MsgFun, AckTags}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQS1 = BQ:fold(AckTags, MsgFun, BQS), + {ok, State #state { backing_queue_state = BQS1 }}; process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index b6d38172b5..dca3bead75 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -28,7 +28,9 @@ -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). -export([start_cover/1]). +-export([confirm_to_sender/2]). -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). +-export([is_abnormal_termination/1]). -export([with_user/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([execute_mnesia_transaction/2]). @@ -44,7 +46,8 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). --export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). +-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3, + gb_trees_set_insert/3]). -export([gb_trees_fold/3, gb_trees_foreach/2]). -export([get_options/2]). -export([all_module_attributes/1, build_acyclic_graph/3]). @@ -108,7 +111,6 @@ (rabbit_framing:amqp_table(), binary(), rabbit_framing:amqp_field_type(), rabbit_framing:amqp_value()) -> rabbit_framing:amqp_table()). - -spec(r/2 :: (rabbit_types:vhost(), K) -> rabbit_types:r3(rabbit_types:vhost(), K, '_') when is_subtype(K, atom())). @@ -131,6 +133,7 @@ (atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). -spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]). +-spec(is_abnormal_termination/1 :: (any()) -> boolean()). -spec(with_user/2 :: (rabbit_types:username(), thunk(A)) -> A). -spec(with_user_and_vhost/3 :: (rabbit_types:username(), rabbit_types:vhost(), thunk(A)) @@ -172,6 +175,7 @@ -spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). -spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()). -spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()). +-spec(gb_trees_set_insert/3 :: (any(), any(), gb_tree()) -> gb_tree()). -spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A). -spec(gb_trees_foreach/2 :: (fun ((any(), any()) -> any()), gb_tree()) -> 'ok'). @@ -372,6 +376,9 @@ report_coverage_percentage(File, Cov, NotCov, Mod) -> end, Mod]). +confirm_to_sender(Pid, MsgSeqNos) -> + gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). + throw_on_error(E, Thunk) -> case Thunk() of {error, Reason} -> throw({E, Reason}); @@ -397,6 +404,12 @@ filter_exit_map(F, L) -> fun () -> Ref end, fun () -> F(I) end) || I <- L]). +is_abnormal_termination(Reason) + when Reason =:= noproc; Reason =:= noconnection; + Reason =:= normal; Reason =:= shutdown -> false; +is_abnormal_termination({shutdown, _}) -> false; +is_abnormal_termination(_) -> true. + with_user(Username, Thunk) -> fun () -> case mnesia:read({rabbit_user, Username}) of @@ -701,6 +714,15 @@ gb_trees_cons(Key, Value, Tree) -> none -> gb_trees:insert(Key, [Value], Tree) end. +gb_trees_set_insert(Key, Value, Tree) -> + case gb_trees:lookup(Key, Tree) of + {value, Values} -> + Values1 = gb_sets:insert(Value, Values), + gb_trees:update(Key, Values1, Tree); + none -> + gb_trees:insert(Key, gb_sets:singleton(Value), Tree) + end. + gb_trees_fold(Fun, Acc, Tree) -> gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))). diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index 3025d981d4..22ff555ff0 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -21,7 +21,7 @@ -include_lib("public_key/include/public_key.hrl"). -export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]). --export([peer_cert_subject_items/2]). +-export([peer_cert_subject_items/2, peer_cert_auth_name/1]). %%-------------------------------------------------------------------------- @@ -36,6 +36,8 @@ -spec(peer_cert_validity/1 :: (certificate()) -> string()). -spec(peer_cert_subject_items/2 :: (certificate(), tuple()) -> [string()] | 'not_found'). +-spec(peer_cert_auth_name/1 :: + (certificate()) -> binary() | 'not_found' | 'unsafe'). -endif. @@ -76,6 +78,43 @@ peer_cert_validity(Cert) -> format_asn1_value(End)]) end, Cert). +%% Extract a username from the certificate +peer_cert_auth_name(Cert) -> + {ok, Mode} = application:get_env(rabbit, ssl_cert_login_from), + peer_cert_auth_name(Mode, Cert). + +peer_cert_auth_name(distinguished_name, Cert) -> + case auth_config_sane() of + true -> iolist_to_binary(peer_cert_subject(Cert)); + false -> unsafe + end; + +peer_cert_auth_name(common_name, Cert) -> + %% If there is more than one CN then we join them with "," in a + %% vaguely DN-like way. But this is more just so we do something + %% more intelligent than crashing, if you actually want to escape + %% things properly etc, use DN mode. + case auth_config_sane() of + true -> case peer_cert_subject_items(Cert, ?'id-at-commonName') of + not_found -> not_found; + CNs -> list_to_binary(string:join(CNs, ",")) + end; + false -> unsafe + end. + +auth_config_sane() -> + {ok, Opts} = application:get_env(rabbit, ssl_options), + case {proplists:get_value(fail_if_no_peer_cert, Opts), + proplists:get_value(verify, Opts)} of + {true, verify_peer} -> + true; + {F, V} -> + rabbit_log:warning("SSL certificate authentication disabled, " + "fail_if_no_peer_cert=~p; " + "verify=~p~n", [F, V]), + false + end. + %%-------------------------------------------------------------------------- cert_info(F, Cert) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 29e0428de2..f7e3baa706 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2366,7 +2366,7 @@ test_dropwhile(VQ0) -> VQ2 = rabbit_variable_queue:dropwhile( fun(#message_properties { expiry = Expiry }) -> Expiry =< 5 - end, VQ1), + end, undefined, VQ1), %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> @@ -2383,10 +2383,11 @@ test_dropwhile(VQ0) -> test_dropwhile_varying_ram_duration(VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), - VQ3 = rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ2), + VQ3 = rabbit_variable_queue:dropwhile( + fun(_) -> false end, undefined, VQ2), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), - rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ5). + rabbit_variable_queue:dropwhile(fun(_) -> false end, undefined, VQ5). test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 52eb168a42..1b32d21197 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,11 +18,11 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, - dropwhile/2, fetch/2, ack/2, requeue/2, len/1, is_empty/1, + dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, - multiple_routing_keys/0]). + multiple_routing_keys/0, fold/3]). -export([start/1, stop/0]). @@ -581,15 +581,23 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> confirmed = gb_sets:new() }} end. -dropwhile(Pred, State) -> +dropwhile(Pred, MsgFun, State) -> case queue_out(State) of {empty, State1} -> a(State1); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case Pred(MsgProps) of - true -> {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile(Pred, State2); - false -> a(in_r(MsgStatus, State1)) + case {Pred(MsgProps), MsgFun} of + {true, undefined} -> + {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile(Pred, MsgFun, State2); + {true, _} -> + {{_, _, AckTag, _}, State2} = + internal_fetch(true, MsgStatus, State1), + {MsgStatus, State3} = read_msg(MsgStatus, State2), + MsgFun(MsgStatus#msg_status.msg, AckTag), + dropwhile(Pred, MsgFun, State3); + {false, _} -> + a(in_r(MsgStatus, State1)) end end. @@ -628,11 +636,22 @@ ack(AckTags, State) -> persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) })}. -requeue(AckTags, #vqstate { delta = Delta, - q3 = Q3, - q4 = Q4, - in_counter = InCounter, - len = Len } = State) -> +fold(undefined, State, _AckTags) -> + State; +fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) -> + lists:foldl( + fun(SeqId, State1) -> + {MsgStatus, State2} = + read_msg(gb_trees:get(SeqId, PA), State1), + MsgFun(MsgStatus#msg_status.msg, SeqId), + State2 + end, State, AckTags). + +requeue(AckTags, #vqstate { delta = Delta, + q3 = Q3, + q4 = Q4, + in_counter = InCounter, + len = Len } = State) -> {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [], beta_limit(Q3), fun publish_alpha/2, State), |
