diff options
| author | Simon MacMullen <simon@lshift.net> | 2010-05-28 14:25:05 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@lshift.net> | 2010-05-28 14:25:05 +0100 |
| commit | 4c035fae81a0d07529c8c18c115bfdcf02ef8514 (patch) | |
| tree | 2e2d42b94d9c8a0a11455c40e63fb366b1f9b177 | |
| parent | f5bb52f0b28e7c628290e7596ea866c0dcc2f8aa (diff) | |
| parent | ab9024d762cfc1eb3da82d0a0293cbfd69d2bfea (diff) | |
| download | rabbitmq-server-git-4c035fae81a0d07529c8c18c115bfdcf02ef8514.tar.gz | |
Merge default into amqp_0_9_1.
| -rw-r--r-- | Makefile | 10 | ||||
| -rw-r--r-- | docs/rabbitmqctl.1.xml | 2 | ||||
| -rw-r--r-- | include/rabbit.hrl | 3 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 6 | ||||
| -rw-r--r-- | packaging/debs/Debian/debian/postrm.in | 19 | ||||
| -rw-r--r-- | packaging/debs/Debian/debian/rules | 2 | ||||
| -rw-r--r-- | packaging/macports/Makefile | 2 | ||||
| -rw-r--r-- | src/delegate.erl | 45 | ||||
| -rw-r--r-- | src/rabbit.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 106 | ||||
| -rw-r--r-- | src/rabbit_binary_generator.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 177 | ||||
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 82 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_plugin_activator.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_reader_queue_collector.erl | 108 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 20 | ||||
| -rw-r--r-- | src/tcp_acceptor.erl | 7 |
20 files changed, 439 insertions, 248 deletions
@@ -56,7 +56,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME) SIBLING_CODEGEN_DIR=../rabbitmq-codegen/ AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen) -AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.9.1.json +AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.9.1.json ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e @@ -81,11 +81,11 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app $(EBIN_DIR)/%.beam: erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< -$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) - $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@ +$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) + $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@ -$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) - $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@ +$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) + $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@ dialyze: $(BEAM_TARGETS) $(BASIC_PLT) $(ERL_EBIN) -eval \ diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 5e2668c1a6..a2038cf0e9 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -271,7 +271,7 @@ <variablelist> <varlistentry> - <term><cmdsynopsis><command>cluster</command> <arg choice="req"><replaceable>clusternode</replaceable></arg></cmdsynopsis></term> + <term><cmdsynopsis><command>cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index c2dad74475..d4327980be 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -51,7 +51,8 @@ -record(exchange, {name, type, durable, arguments}). --record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid}). +-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, + arguments, pid}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 6926261f79..00066a15f7 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -107,6 +107,12 @@ if [ $1 = 0 ]; then # Leave rabbitmq user and group fi +# Clean out plugin activation state, both on uninstall and upgrade +rm -rf %{_rabbit_erllibdir}/priv +for ext in rel script boot ; do + rm -f %{_rabbit_erllibdir}/ebin/rabbit.$ext +done + %files -f ../%{name}.files %defattr(-,root,root,-) %attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq diff --git a/packaging/debs/Debian/debian/postrm.in b/packaging/debs/Debian/debian/postrm.in index bfcf1f530e..5290de9b17 100644 --- a/packaging/debs/Debian/debian/postrm.in +++ b/packaging/debs/Debian/debian/postrm.in @@ -18,6 +18,13 @@ set -e # for details, see http://www.debian.org/doc/debian-policy/ or # the debian-policy package +remove_plugin_traces() { + # Remove traces of plugins + rm -rf @RABBIT_LIB@/priv @RABBIT_LIB@/plugins + for ext in rel script boot ; do + rm -f @RABBIT_LIB@/ebin/rabbit.$ext + done +} case "$1" in purge) @@ -34,11 +41,7 @@ case "$1" in if [ -d /etc/rabbitmq ]; then rm -r /etc/rabbitmq fi - # Remove traces of plugins - rm -rf @RABBIT_LIB@/priv @RABBIT_LIB@/plugins - for ext in rel script boot ; do - rm -f @RABBIT_LIB@/ebin/rabbit.$ext - done + remove_plugin_traces if getent passwd rabbitmq >/dev/null; then # Stop epmd if run by the rabbitmq user pkill -u rabbitmq epmd || : @@ -50,7 +53,11 @@ case "$1" in fi ;; - remove|upgrade|failed-upgrade|abort-install|abort-upgrade|disappear) + remove|upgrade) + remove_plugin_traces + ;; + + failed-upgrade|abort-install|abort-upgrade|disappear) ;; *) diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index 45659602f5..1916651426 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -13,7 +13,7 @@ DOCDIR=$(DEB_DESTDIR)usr/share/doc/rabbitmq-server/ install/rabbitmq-server:: mkdir -p $(DOCDIR) - rm $(RABBIT_LIB)LICENSE* + rm $(RABBIT_LIB)LICENSE* $(RABBIT_LIB)INSTALL* for script in rabbitmqctl rabbitmq-server rabbitmq-multi; do \ install -p -D -m 0755 debian/rabbitmq-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ done diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile index 0ef7dd5e73..4ad4c30b2c 100644 --- a/packaging/macports/Makefile +++ b/packaging/macports/Makefile @@ -39,7 +39,7 @@ macports: dirs $(DEST)/Portfile $(DEST)/files/rabbitmq-script-wrapper cp patch-org.macports.rabbitmq-server.plist.diff $(DEST)/files if [ -n "$(MACPORTS_USERHOST)" ] ; then \ - tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) lshift@macrabbit ' \ + tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) $(MACPORTS_USERHOST) ' \ d="/tmp/mkportindex.$$$$" ; \ mkdir $$d \ && cd $$d \ diff --git a/src/delegate.erl b/src/delegate.erl index 12eb814f8f..98353453ff 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -63,7 +63,7 @@ start_link(Hash) -> gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). invoke(Pid, Fun) when is_pid(Pid) -> - [Res] = invoke_per_node([{node(Pid), [Pid]}], Fun), + [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun), case Res of {ok, Result, _} -> Result; @@ -83,7 +83,7 @@ invoke(Pids, Fun) when is_list(Pids) -> invoke_per_node(split_delegate_per_node(Pids), Fun)). invoke_no_result(Pid, Fun) when is_pid(Pid) -> - invoke_no_result_per_node([{node(Pid), [Pid]}], Fun), + invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun), ok; invoke_no_result(Pids, Fun) when is_list(Pids) -> @@ -99,32 +99,37 @@ internal_cast(Node, Thunk) when is_atom(Node) -> gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}). split_delegate_per_node(Pids) -> - orddict:to_list( - lists:foldl( - fun (Pid, D) -> - orddict:update(node(Pid), - fun (Pids1) -> [Pid | Pids1] end, - [Pid], D) - end, - orddict:new(), Pids)). + LocalNode = node(), + {Local, Remote} = + lists:foldl( + fun (Pid, {L, D}) -> + Node = node(Pid), + case Node of + LocalNode -> {[Pid|L], D}; + _ -> {L, orddict:append(Node, Pid, D)} + end + end, + {[], orddict:new()}, Pids), + {Local, orddict:to_list(Remote)}. -invoke_per_node([{Node, Pids}], Fun) when Node == node() -> - safe_invoke(Pids, Fun); invoke_per_node(NodePids, Fun) -> lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)). -invoke_no_result_per_node([{Node, Pids}], Fun) when Node == node() -> - %% This is not actually async! However, in practice Fun will - %% always be something that does a gen_server:cast or similar, so - %% I don't think it's a problem unless someone misuses this - %% function. Making this *actually* async would be painful as we - %% can't spawn at this point or we break effect ordering. - safe_invoke(Pids, Fun); invoke_no_result_per_node(NodePids, Fun) -> delegate_per_node(NodePids, Fun, fun internal_cast/2), ok. -delegate_per_node(NodePids, Fun, DelegateFun) -> +delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) -> + %% In the case where DelegateFun is internal_cast, the safe_invoke + %% is not actually async! However, in practice Fun will always be + %% something that does a gen_server:cast or similar, so I don't + %% think it's a problem unless someone misuses this + %% function. Making this *actually* async would be painful as we + %% can't spawn at this point or we break effect ordering. + [safe_invoke(LocalPids, Fun)| + delegate_per_remote_node(NodePids, Fun, DelegateFun)]. + +delegate_per_remote_node(NodePids, Fun, DelegateFun) -> Self = self(), %% Note that this is unsafe if the Fun requires reentrancy to the %% local_server. I.e. if self() == local_server(Node) then we'll diff --git a/src/rabbit.erl b/src/rabbit.erl index 525558d88f..09a190148f 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -299,6 +299,18 @@ run_boot_step({StepName, Attributes}) -> ok end. +module_attributes(Module) -> + case catch Module:module_info(attributes) of + {'EXIT', {undef, [{Module, module_info, _} | _]}} -> + io:format("WARNING: module ~p not found, so not scanned for boot steps.~n", + [Module]), + []; + {'EXIT', Reason} -> + exit(Reason); + V -> + V + end. + boot_steps() -> AllApps = [App || {App, _, _} <- application:loaded_applications()], Modules = lists:usort( @@ -310,7 +322,7 @@ boot_steps() -> lists:flatmap(fun (Module) -> [{StepName, Attributes} || {rabbit_boot_step, [{StepName, Attributes}]} - <- Module:module_info(attributes)] + <- module_attributes(Module)] end, Modules), sort_boot_steps(UnsortedSteps). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b237be8a48..08241027e5 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -65,8 +65,8 @@ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). -spec(start/0 :: () -> 'ok'). --spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), maybe(pid())) -> - amqqueue()). +-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), + maybe(pid())) -> amqqueue()). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). @@ -101,8 +101,7 @@ -spec(basic_consume/7 :: (amqqueue(), boolean(), pid(), pid() | 'undefined', ctag(), boolean(), any()) -> - 'ok' | {'error', 'queue_owned_by_another_connection' | - 'exclusive_consume_unavailable'}). + 'ok' | {'error', 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). @@ -145,7 +144,7 @@ find_durable_queues() -> recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(Q) || Q <- DurableQueues], [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q]. - + declare(QueueName, Durable, AutoDelete, Args, Owner) -> Q = start_queue_process(#amqqueue{name = QueueName, durable = Durable, @@ -320,10 +319,12 @@ flush_all(QPids, ChPid) -> delegate:invoke_no_result( QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end). -internal_delete2(QueueName) -> +internal_delete1(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), ok = mnesia:delete({rabbit_durable_queue, QueueName}), - %% this is last because it returns a post-transaction callback + %% we want to execute some things, as + %% decided by rabbit_exchange, after the + %% transaction. rabbit_exchange:delete_queue_bindings(QueueName). internal_delete(QueueName) -> @@ -332,14 +333,10 @@ internal_delete(QueueName) -> fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [_] -> internal_delete2(QueueName) + [_] -> internal_delete1(QueueName) end end) of - Err = {error, _} -> - Err; - %% we want to execute some things, as - %% decided by rabbit_exchange, after the - %% transaction. + Err = {error, _} -> Err; PostHook -> PostHook(), ok diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ff1b8f1baa..e7c926643f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -132,6 +132,23 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +declare(Recover, From, + State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, + backing_queue = BQ, backing_queue_state = undefined}) -> + case rabbit_amqqueue:internal_declare(Q, Recover) of + not_found -> {stop, normal, not_found, State}; + Q -> gen_server2:reply(From, Q), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, + [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, + set_ram_duration_target, [self()]}), + BQS = BQ:init(QName, IsDurable, Recover), + noreply(State#q{backing_queue_state = BQS}); + Q1 -> {stop, normal, Q1, State} + end. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), @@ -484,8 +501,8 @@ i(pid, _) -> self(); i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) -> ''; -i(owner_pid, #q{q = #amqqueue{exclusive_owner = ReaderPid}}) -> - ReaderPid; +i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) -> + ExclusiveOwner; i(exclusive_consumer_pid, #q{exclusive_consumer = none}) -> ''; i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) -> @@ -514,50 +531,24 @@ i(Item, _) -> %--------------------------------------------------------------------------- handle_call({init, Recover}, From, - State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable, - exclusive_owner = ExclusiveOwner}, - backing_queue = BQ, backing_queue_state = undefined}) -> - Declare = - fun() -> - case rabbit_amqqueue:internal_declare(Q, Recover) of - not_found -> - {stop, normal, not_found, State}; - Q -> - gen_server2:reply(From, Q), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, - [self()]), - ok = rabbit_memory_monitor:register( - self(), {rabbit_amqqueue, - set_ram_duration_target, [self()]}), - noreply( - State#q{backing_queue_state = - BQ:init(QName, IsDurable, Recover)}); - Q1 -> - {stop, normal, Q1, State} - end - end, + State = #q{q = #amqqueue{exclusive_owner = none}}) -> + declare(Recover, From, State); - case ExclusiveOwner of - none -> - Declare(); - Owner -> - case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of - true -> - erlang:monitor(process, Owner), - Declare(); - _ -> - case Recover of - true -> ok; - _ -> rabbit_log:warning( - "Queue ~p exclusive owner went away~n", - [QName]) - end, - %% Rely on terminate to delete the queue. - {stop, normal, not_found, - State#q{backing_queue_state = - BQ:init(QName, IsDurable, Recover)}} - end +handle_call({init, Recover}, From, + State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> + case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of + true -> erlang:monitor(process, Owner), + declare(Recover, From, State); + _ -> #q{q = #amqqueue{name = QName, durable = IsDurable}, + backing_queue = BQ, backing_queue_state = undefined} = State, + case Recover of + true -> ok; + _ -> rabbit_log:warning( + "Queue ~p exclusive owner went away~n", [QName]) + end, + BQS = BQ:init(QName, IsDurable, Recover), + %% Rely on terminate to delete the queue. + {stop, normal, not_found, State#q{backing_queue_state = BQS}} end; handle_call(info, _From, State) -> @@ -642,16 +633,15 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, ok -> C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, - ack_required = not NoAck}, + ack_required = not NoAck}, store_ch_record(C#cr{consumer_count = ConsumerCount +1, limiter_pid = LimiterPid}), - case ConsumerCount of - 0 -> rabbit_limiter:register(LimiterPid, self()); - _ -> ok - end, - ExclusiveConsumer = case ExclusiveConsume of - true -> {ChPid, ConsumerTag}; - false -> ExistingHolder + ok = case ConsumerCount of + 0 -> rabbit_limiter:register(LimiterPid, self()); + _ -> ok + end, + ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> ExistingHolder end, State1 = State#q{has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, @@ -741,7 +731,6 @@ handle_call({requeue, AckTags, ChPid}, _From, State) -> handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). - handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), @@ -817,8 +806,13 @@ handle_cast({set_maximum_since_use, Age}, State) -> noreply(State). handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, - State = #q{q= #amqqueue{ exclusive_owner = DownPid}}) -> - %% Exclusively owned queues must disappear with their owner. + State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> + %% Exclusively owned queues must disappear with their owner. In + %% the case of clean shutdown we delete the queue synchronously in + %% the reader - although not required by the spec this seems to + %% match what people expect (see bug 21824). However we need this + %% monitor-and-async- delete in case the connection goes away + %% unexpectedly. {stop, normal, State}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> case handle_ch_down(DownPid, State) of diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index ed84373585..27a1275a31 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -118,10 +118,11 @@ build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc, [Frag | Frags], BodyPayloadMax, ChannelInt) -> Size = size(Frag), {NewFragSizeRem, NewFragAcc, NewFrags} = - case Size =< FragSizeRem of - true -> {FragSizeRem - Size, [Frag | FragAcc], Frags}; - false -> <<Head:FragSizeRem/binary, Tail/binary>> = Frag, - {0, [Head | FragAcc], [Tail | Frags]} + if Size == 0 -> {FragSizeRem, FragAcc, Frags}; + Size =< FragSizeRem -> {FragSizeRem - Size, [Frag | FragAcc], Frags}; + true -> <<Head:FragSizeRem/binary, Tail/binary>> = + Frag, + {0, [Head | FragAcc], [Tail | Frags]} end, build_content_frames(SizeAcc, FramesAcc, NewFragSizeRem, NewFragAcc, NewFrags, BodyPayloadMax, ChannelInt). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e29abb8544..57d29d5ed9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). --export([start_link/5, do/2, do/3, shutdown/1]). +-export([start_link/6, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). @@ -46,7 +46,7 @@ transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking}). + consumer_mapping, blocking, queue_collector_pid}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -66,8 +66,8 @@ -ifdef(use_specs). --spec(start_link/5 :: - (channel_number(), pid(), pid(), username(), vhost()) -> pid()). +-spec(start_link/6 :: + (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()). -spec(do/2 :: (pid(), amqp_method()) -> 'ok'). -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). @@ -86,10 +86,10 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, Username, VHost) -> +start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> {ok, Pid} = gen_server2:start_link( ?MODULE, [Channel, ReaderPid, WriterPid, - Username, VHost], []), + Username, VHost, CollectorPid], []), Pid. do(Pid, Method) -> @@ -135,10 +135,9 @@ info_all(Items) -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, Username, VHost]) -> +init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> process_flag(trap_exit, true), link(WriterPid), - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), ok = pg_local:join(rabbit_channels, self()), {ok, #ch{state = starting, channel = Channel, @@ -154,7 +153,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) -> virtual_host = VHost, most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), - blocking = dict:new()}, + blocking = dict:new(), + queue_collector_pid = CollectorPid}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -299,13 +299,20 @@ check_write_permitted(Resource, #ch{ username = Username}) -> check_read_permitted(Resource, #ch{ username = Username}) -> check_resource_access(Username, Resource, read). -check_queue_exclusivity(ReaderPid, Q) -> - case Q of - #amqqueue{ exclusive_owner = none} -> Q; - #amqqueue{ exclusive_owner = ReaderPid } -> Q; - _ -> rabbit_misc:protocol_error(resource_locked, - "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(Q#amqqueue.name)]) +with_exclusive_access_or_die(QName, ReaderPid, F) -> + case rabbit_amqqueue:with_or_die( + QName, fun (Q = #amqqueue{exclusive_owner = Owner}) + when Owner =:= none orelse Owner =:= ReaderPid -> + F(Q); + (_) -> + {error, wrong_exclusive_owner} + end) of + {error, wrong_exclusive_owner} -> + rabbit_misc:protocol_error( + resource_locked, "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QName)]); + Other -> + Other end. expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> @@ -362,6 +369,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> + rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), {reply, #'channel.open_ok'{}, State#ch{state = running}}; handle_method(#'channel.open'{}, _, _State) -> @@ -446,10 +454,10 @@ handle_method(#'basic.get'{queue = QueueNameBin, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - case rabbit_amqqueue:with_or_die( + case with_exclusive_access_or_die( QueueName, + ReaderPid, fun (Q) -> - check_queue_exclusivity(ReaderPid, Q), rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, @@ -493,10 +501,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, %% In order to ensure that the consume_ok gets sent before %% any messages are sent to the consumer, we get the queue %% process to send the consume_ok on our behalf. - case rabbit_amqqueue:with_or_die( - QueueName, + case with_exclusive_access_or_die( + QueueName, ReaderPid, fun (Q) -> - check_queue_exclusivity(ReaderPid, Q), rabbit_amqqueue:basic_consume( Q, NoAck, self(), LimiterPid, ActualConsumerTag, ExclusiveConsume, @@ -684,46 +691,53 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, return_ok(State, NoWait, #'exchange.delete_ok'{}) end; -handle_method(#'queue.declare'{queue = QueueNameBin, - passive = false, - durable = Durable, - exclusive = ExclusiveDeclare, +handle_method(#'queue.declare'{queue = QueueNameBin, + passive = false, + durable = Durable, + exclusive = ExclusiveDeclare, auto_delete = AutoDelete, - nowait = NoWait, - arguments = Args}, - _, State = #ch { virtual_host = VHostPath, - reader_pid = ReaderPid }) -> + nowait = NoWait, + arguments = Args}, + _, State = #ch{virtual_host = VHostPath, + reader_pid = ReaderPid, + queue_collector_pid = CollectorPid}) -> Owner = case ExclusiveDeclare of true -> ReaderPid; false -> none end, %% We use this in both branches, because queue_declare may yet return an %% existing queue. - Finish = - fun(Q) -> - case Q of - %% "equivalent" rule. NB: we don't pay attention to - %% anything in the arguments table, so for the sake of the - %% "equivalent" rule, all tables of arguments are - %% semantically equivalant. - Matched = #amqqueue{name = QueueName, - durable = Durable, %% i.e., as supplied - exclusive_owner = Owner, - auto_delete = AutoDelete %% i.e,. as supplied - } -> - check_configure_permitted(QueueName, State), - Matched; - %% exclusivity trumps non-equivalence arbitrarily - #amqqueue{name = QueueName, exclusive_owner = ExclusiveOwner} - when ExclusiveOwner =/= Owner -> - rabbit_misc:protocol_error(resource_locked, - "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(QueueName)]); - #amqqueue{name = QueueName} -> - rabbit_misc:protocol_error(channel_error, - "parameters for ~s not equivalent", - [rabbit_misc:rs(QueueName)]) - end + Finish = + fun (#amqqueue{name = QueueName, exclusive_owner = Owner1, + durable = Durable1, auto_delete = AutoDelete1} = Q) + %% "equivalent" rule. NB: we don't pay attention to + %% anything in the arguments table, so for the sake of the + %% "equivalent" rule, all tables of arguments are + %% semantically equivalant. + when Owner =:= Owner1, + Durable =:= Durable1, + AutoDelete =:= AutoDelete1 -> + check_configure_permitted(QueueName, State), + %% We need to notify the reader within the channel + %% process so that we can be sure there are no + %% outstanding exclusive queues being declared as the + %% connection shuts down. + case Owner of + none -> ok; + _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) + end, + Q; + (#amqqueue{name = QueueName, exclusive_owner = Owner1}) + when Owner =:= Owner1 -> + rabbit_misc:protocol_error(channel_error, + "parameters for ~s not equivalent", + [rabbit_misc:rs(QueueName)]); + (#amqqueue{name = QueueName}) -> + %% exclusivity trumps non-equivalence arbitrarily + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QueueName)]) end, Q = case rabbit_amqqueue:with( rabbit_misc:r(VHostPath, queue, QueueNameBin), @@ -735,37 +749,33 @@ handle_method(#'queue.declare'{queue = QueueNameBin, Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - check_configure_permitted(QueueName, State), - Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner)); - Found -> Found + Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, + Args, Owner)); + #amqqueue{} = Other -> + Other end, return_queue_declare_ok(State, NoWait, Q); -handle_method(#'queue.declare'{queue = QueueNameBin, +handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, - nowait = NoWait}, - _, State = #ch{ virtual_host = VHostPath, - reader_pid = ReaderPid }) -> + nowait = NoWait}, + _, State = #ch{virtual_host = VHostPath, + reader_pid = ReaderPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), - CheckExclusive = fun(Q) -> check_queue_exclusivity(ReaderPid, Q) end, - Q = rabbit_amqqueue:with_or_die(QueueName, CheckExclusive), + Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun (Q) -> Q end), return_queue_declare_ok(State, NoWait, Q); handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty, - nowait = NoWait - }, - _, State = #ch{ reader_pid = ReaderPid }) -> + nowait = NoWait}, + _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), - case rabbit_amqqueue:with_or_die( - QueueName, - fun (Q) -> - check_queue_exclusivity(ReaderPid, Q), - rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) - end) of + case with_exclusive_access_or_die( + QueueName, ReaderPid, + fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> rabbit_misc:protocol_error( precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]); @@ -774,8 +784,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]); {ok, PurgedMessageCount} -> return_ok(State, NoWait, - #'queue.delete_ok'{ - message_count = PurgedMessageCount}) + #'queue.delete_ok'{message_count = PurgedMessageCount}) end; handle_method(#'queue.bind'{queue = QueueNameBin, @@ -797,15 +806,12 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, - _, State = #ch{ reader_pid = ReaderPid }) -> + _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die( - QueueName, - fun (Q) -> - check_queue_exclusivity(ReaderPid, Q), - rabbit_amqqueue:purge(Q) - end), + {ok, PurgedMessageCount} = with_exclusive_access_or_die( + QueueName, ReaderPid, + fun (Q) -> rabbit_amqqueue:purge(Q) end), return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); @@ -881,8 +887,13 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), - CheckExclusive = fun(_X, Q) -> check_queue_exclusivity(ReaderPid, Q) end, - case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, CheckExclusive) of + CheckExclusive = + fun(_X, Q) -> + with_exclusive_access_or_die(Q#amqqueue.name, + ReaderPid, fun(_Q1)-> ok end) + end, + case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, + CheckExclusive) of {error, exchange_not_found} -> rabbit_misc:not_found(ExchangeName); {error, queue_not_found} -> diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index b4fd91560f..a7ca20c80b 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -43,7 +43,7 @@ -include("rabbit.hrl"). --record(iv_state, { queue, qname, len, pending_ack }). +-record(iv_state, { queue, qname, durable, len, pending_ack }). -record(tx, { pending_messages, pending_acks, is_persistent }). -ifdef(use_specs). @@ -66,18 +66,23 @@ init(QName, IsDurable, Recover) -> true -> rabbit_persister:queue_content(QName); false -> [] end), - #iv_state { queue = Q, qname = QName, len = queue:len(Q), + #iv_state { queue = Q, + qname = QName, + durable = IsDurable, + len = queue:len(Q), pending_ack = dict:new() }. terminate(State) -> State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }. -delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA }) -> - ok = persist_acks(none, QName, dict:fetch_keys(PA), PA), +delete_and_terminate(State = #iv_state { qname = QName, durable = IsDurable, + pending_ack = PA }) -> + ok = persist_acks(QName, IsDurable, none, dict:fetch_keys(PA), PA), {_PLen, State1} = purge(State), terminate(State1). -purge(State = #iv_state { len = Len, queue = Q, qname = QName }) -> +purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, + len = Len }) -> %% We do not purge messages pending acks. {AckTags, PA} = rabbit_misc:queue_fold( @@ -85,57 +90,63 @@ purge(State = #iv_state { len = Len, queue = Q, qname = QName }) -> Acc; ({Msg = #basic_message { guid = Guid }, IsDelivered}, {AckTagsN, PAN}) -> - ok = persist_delivery(QName, Msg, IsDelivered), + ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)} end, {[], dict:new()}, Q), - ok = persist_acks(none, QName, AckTags, PA), + ok = persist_acks(QName, IsDurable, none, AckTags, PA), {Len, State #iv_state { len = 0, queue = queue:new() }}. -publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len }) -> - ok = persist_message(none, QName, Msg), +publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable, + len = Len }) -> + ok = persist_message(QName, IsDurable, none, Msg), State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }. publish_delivered(false, _Msg, State) -> {blank_ack, State}; publish_delivered(true, Msg = #basic_message { guid = Guid }, - State = #iv_state { qname = QName, len = 0, - pending_ack = PA }) -> - ok = persist_message(none, QName, Msg), - ok = persist_delivery(QName, Msg, false), + State = #iv_state { qname = QName, durable = IsDurable, + len = 0, pending_ack = PA }) -> + ok = persist_message(QName, IsDurable, none, Msg), + ok = persist_delivery(QName, IsDurable, false, Msg), {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}. fetch(_AckRequired, State = #iv_state { len = 0 }) -> {empty, State}; -fetch(AckRequired, State = #iv_state { queue = Q, qname = QName, len = Len, +fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName, + durable = IsDurable, pending_ack = PA }) -> {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} = queue:out(Q), Len1 = Len - 1, - ok = persist_delivery(QName, Msg, IsDelivered), + ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), PA1 = dict:store(Guid, Msg, PA), {AckTag, PA2} = case AckRequired of true -> {Guid, PA1}; - false -> ok = persist_acks(none, QName, [Guid], PA1), + false -> ok = persist_acks(QName, IsDurable, none, + [Guid], PA1), {blank_ack, PA} end, {{Msg, IsDelivered, AckTag, Len1}, State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}. -ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA }) -> - ok = persist_acks(none, QName, AckTags, PA), +ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, + pending_ack = PA }) -> + ok = persist_acks(QName, IsDurable, none, AckTags, PA), PA1 = remove_acks(AckTags, PA), State #iv_state { pending_ack = PA1 }. -tx_publish(Txn, Msg, State = #iv_state { qname = QName }) -> +tx_publish(Txn, Msg, State = #iv_state { qname = QName, + durable = IsDurable }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), - ok = persist_message(Txn, QName, Msg), + ok = persist_message(QName, IsDurable, Txn, Msg), State. -tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA }) -> +tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable, + pending_ack = PA }) -> Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), - ok = persist_acks(Txn, QName, AckTags, PA), + ok = persist_acks(QName, IsDurable, Txn, AckTags, PA), State. tx_rollback(Txn, State = #iv_state { qname = QName }) -> @@ -228,32 +239,33 @@ do_if_persistent(F, Txn, QName) -> %%---------------------------------------------------------------------------- -persist_message(_Txn, _QName, #basic_message { is_persistent = false }) -> - ok; -persist_message(Txn, QName, Msg) -> +persist_message(QName, true, Txn, Msg = #basic_message { + is_persistent = true }) -> Msg1 = Msg #basic_message { %% don't persist any recoverable decoded properties, %% rebuild from properties_bin on restore content = rabbit_binary_parser:clear_decoded_content( Msg #basic_message.content)}, persist_work(Txn, QName, - [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]). + [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]); +persist_message(_QName, _IsDurable, _Txn, _Msg) -> + ok. -persist_delivery(_QName, #basic_message { is_persistent = false }, - _IsDelivered) -> - ok; -persist_delivery(_QName, _Message, true) -> - ok; -persist_delivery(QName, #basic_message { guid = Guid }, _IsDelivered) -> - persist_work(none, QName, [{deliver, {QName, Guid}}]). +persist_delivery(QName, true, false, #basic_message { is_persistent = true, + guid = Guid }) -> + persist_work(none, QName, [{deliver, {QName, Guid}}]); +persist_delivery(_QName, _IsDurable, _IsDelivered, _Msg) -> + ok. -persist_acks(Txn, QName, AckTags, PA) -> +persist_acks(QName, true, Txn, AckTags, PA) -> persist_work(Txn, QName, [{ack, {QName, Guid}} || Guid <- AckTags, begin {ok, Msg} = dict:find(Guid, PA), Msg #basic_message.is_persistent - end]). + end]); +persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) -> + ok. persist_work(_Txn,_QName, []) -> ok; diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 723b818b41..9a911ab15d 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -537,18 +537,24 @@ pid_to_string(Pid) when is_pid(Pid) -> %% inverse of above string_to_pid(Str) -> + Err = {error, {invalid_pid_syntax, Str}}, %% The \ before the trailing $ is only there to keep emacs %% font-lock from getting confused. case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$", [{capture,all_but_first,list}]) of {match, [NodeStr, IdStr, SerStr]} -> - %% turn the triple into a pid - see pid_to_string - <<131,NodeEnc/binary>> = term_to_binary(list_to_atom(NodeStr)), + %% the NodeStr atom might be quoted, so we have to parse + %% it rather than doing a simple list_to_atom + NodeAtom = case erl_scan:string(NodeStr) of + {ok, [{atom, _, X}], _} -> X; + {error, _, _} -> throw(Err) + end, + <<131,NodeEnc/binary>> = term_to_binary(NodeAtom), Id = list_to_integer(IdStr), Ser = list_to_integer(SerStr), binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>); nomatch -> - throw({error, {invalid_pid_syntax, Str}}) + throw(Err) end. version_compare(A, B, lte) -> diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index 274981efed..ef3c5cc250 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -108,6 +108,7 @@ start() -> WApp == stdlib; WApp == kernel; WApp == sasl; + WApp == crypto; WApp == os_mon -> false; _ -> true end]), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index d9e6de0553..8e7cd39f8a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -52,13 +52,15 @@ -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 1). -define(CHANNEL_TERMINATION_TIMEOUT, 3). +-define(SILENT_CLOSE_DELAY, 3). %% set to zero once QPid fix their negotiation -define(FRAME_MAX, 131072). -define(CHANNEL_MAX, 0). %--------------------------------------------------------------------------- --record(v1, {sock, connection, callback, recv_ref, connection_state}). +-record(v1, {sock, connection, callback, recv_ref, connection_state, + queue_collector}). -define(INFO_KEYS, [pid, address, port, peer_address, peer_port, @@ -236,6 +238,7 @@ start_connection(Parent, Deb, Sock, SockTransform) -> erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), ProfilingValue = setup_profiling(), + {ok, Collector} = rabbit_reader_queue_collector:start_link(), try mainloop(Parent, Deb, switch_callback( #v1{sock = ClientSock, @@ -247,7 +250,8 @@ start_connection(Parent, Deb, Sock, SockTransform) -> client_properties = none}, callback = uninitialized_callback, recv_ref = none, - connection_state = pre_init}, + connection_state = pre_init, + queue_collector = Collector}, handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> @@ -265,7 +269,9 @@ start_connection(Parent, Deb, Sock, SockTransform) -> %% output to be sent, which results in unnecessary delays. %% %% gen_tcp:close(ClientSock), - teardown_profiling(ProfilingValue) + teardown_profiling(ProfilingValue), + rabbit_reader_queue_collector:shutdown(Collector), + rabbit_misc:unlink_and_capture_exit(Collector) end, done. @@ -428,11 +434,17 @@ wait_for_channel_termination(N, TimerRef) -> exit(channel_termination_timeout) end. -maybe_close(State = #v1{connection_state = closing}) -> +maybe_close(State = #v1{connection_state = closing, + queue_collector = Collector}) -> case all_channels() of - [] -> ok = send_on_channel0( - State#v1.sock, #'connection.close_ok'{}), - close_connection(State); + [] -> + %% Spec says "Exclusive queues may only be accessed by the current + %% connection, and are deleted when that connection closes." + %% This does not strictly imply synchrony, but in practice it seems + %% to be what people assume. + rabbit_reader_queue_collector:delete_all(Collector), + ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}), + close_connection(State); _ -> State end; maybe_close(State) -> @@ -576,7 +588,11 @@ handle_method0(MethodName, FieldsBin, State) -> end, case State#v1.connection_state of running -> send_exception(State, 0, CompleteReason); - Other -> throw({channel0_error, Other, CompleteReason}) + %% We don't trust the client at this point - force + %% them to wait for a bit so they can't DOS us with + %% repeated failed logins etc. + Other -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), + throw({channel0_error, Other, CompleteReason}) end end. @@ -699,15 +715,16 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- -send_to_new_channel(Channel, AnalyzedFrame, State) -> +send_to_new_channel(Channel, AnalyzedFrame, + State = #v1{queue_collector = Collector}) -> #v1{sock = Sock, connection = #connection{ frame_max = FrameMax, user = #user{username = Username}, vhost = VHost}} = State, WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/5, - [Channel, self(), WriterPid, Username, VHost]), + fun rabbit_channel:start_link/6, + [Channel, self(), WriterPid, Username, VHost, Collector]), put({channel, Channel}, {chpid, ChPid}), put({chpid, ChPid}, {channel, Channel}), ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). diff --git a/src/rabbit_reader_queue_collector.erl b/src/rabbit_reader_queue_collector.erl new file mode 100644 index 0000000000..841549e98f --- /dev/null +++ b/src/rabbit_reader_queue_collector.erl @@ -0,0 +1,108 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_reader_queue_collector). + +-behaviour(gen_server). + +-export([start_link/0, register_exclusive_queue/2, delete_all/1, shutdown/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {exclusive_queues}). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()}). +-spec(register_exclusive_queue/2 :: (pid(), amqqueue()) -> 'ok'). +-spec(delete_all/1 :: (pid()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link(?MODULE, [], []). + +register_exclusive_queue(CollectorPid, Q) -> + gen_server:call(CollectorPid, {register_exclusive_queue, Q}, infinity). + +delete_all(CollectorPid) -> + gen_server:call(CollectorPid, delete_all, infinity). + +shutdown(CollectorPid) -> + gen_server:call(CollectorPid, shutdown, infinity). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #state{exclusive_queues = dict:new()}}. + +%%-------------------------------------------------------------------------- + +handle_call({register_exclusive_queue, Q}, _From, + State = #state{exclusive_queues = Queues}) -> + MonitorRef = erlang:monitor(process, Q#amqqueue.pid), + {reply, ok, + State#state{exclusive_queues = dict:store(MonitorRef, Q, Queues)}}; + +handle_call(delete_all, _From, + State = #state{exclusive_queues = ExclusiveQueues}) -> + [rabbit_misc:with_exit_handler( + fun() -> ok end, + fun() -> + erlang:demonitor(MonitorRef), + rabbit_amqqueue:delete(Q, false, false) + end) + || {MonitorRef, Q} <- dict:to_list(ExclusiveQueues)], + {reply, ok, State}; + +handle_call(shutdown, _From, State) -> + {stop, normal, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, + State = #state{exclusive_queues = ExclusiveQueues}) -> + {noreply, State#state{exclusive_queues = + dict:erase(MonitorRef, ExclusiveQueues)}}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 08b4cc75c8..5567cdbe36 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -788,7 +788,8 @@ test_user_management() -> test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), - Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>), + Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, + self()), [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare( rabbit_misc:r(<<"/">>, queue, Name), false, false, [], none) || @@ -894,10 +895,11 @@ test_delegates_async(SecondaryNode) -> passed. -make_responder(FMsg) -> +make_responder(FMsg) -> make_responder(FMsg, timeout). +make_responder(FMsg, Throw) -> fun() -> receive Msg -> FMsg(Msg) - after 1000 -> throw(timeout) + after 1000 -> throw(Throw) end end. @@ -931,17 +933,21 @@ test_delegates_sync(SecondaryNode) -> gen_server:reply(From, response) end), + BadResponder = make_responder(fun({'$gen_call', From, invoked}) -> + gen_server:reply(From, response) + end, bad_responder_died), + response = delegate:invoke(spawn(Responder), Sender), response = delegate:invoke(spawn(SecondaryNode, Responder), Sender), - must_exit(fun() -> delegate:invoke(spawn(Responder), BadSender) end), + must_exit(fun() -> delegate:invoke(spawn(BadResponder), BadSender) end), must_exit(fun() -> - delegate:invoke(spawn(SecondaryNode, Responder), BadSender) end), + delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end), LocalGoodPids = spawn_responders(node(), Responder, 2), RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2), - LocalBadPids = spawn_responders(node(), Responder, 2), - RemoteBadPids = spawn_responders(SecondaryNode, Responder, 2), + LocalBadPids = spawn_responders(node(), BadResponder, 2), + RemoteBadPids = spawn_responders(SecondaryNode, BadResponder, 2), {GoodRes, []} = delegate:invoke(LocalGoodPids ++ RemoteGoodPids, Sender), true = lists:all(fun ({_, response}) -> true end, GoodRes), diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 3b23daa5c1..cc4982c9cb 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -75,6 +75,13 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", [inet_parse:ntoa(Address), Port, inet_parse:ntoa(PeerAddress), PeerPort]), + %% In the event that somebody floods us with connections we can spew + %% the above message at error_logger faster than it can keep up. + %% So error_logger's mailbox grows unbounded until we eat all the + %% memory available and crash. So here's a meaningless synchronous call + %% to the underlying gen_event mechanism - when it returns the mailbox + %% is drained. + gen_event:which_handlers(error_logger), %% handle file_handle_cache:release_on_death(apply(M, F, A ++ [Sock])) catch {inet_error, Reason} -> |
