summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-05-28 14:25:05 +0100
committerSimon MacMullen <simon@lshift.net>2010-05-28 14:25:05 +0100
commit4c035fae81a0d07529c8c18c115bfdcf02ef8514 (patch)
tree2e2d42b94d9c8a0a11455c40e63fb366b1f9b177
parentf5bb52f0b28e7c628290e7596ea866c0dcc2f8aa (diff)
parentab9024d762cfc1eb3da82d0a0293cbfd69d2bfea (diff)
downloadrabbitmq-server-git-4c035fae81a0d07529c8c18c115bfdcf02ef8514.tar.gz
Merge default into amqp_0_9_1.
-rw-r--r--Makefile10
-rw-r--r--docs/rabbitmqctl.1.xml2
-rw-r--r--include/rabbit.hrl3
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec6
-rw-r--r--packaging/debs/Debian/debian/postrm.in19
-rw-r--r--packaging/debs/Debian/debian/rules2
-rw-r--r--packaging/macports/Makefile2
-rw-r--r--src/delegate.erl45
-rw-r--r--src/rabbit.erl14
-rw-r--r--src/rabbit_amqqueue.erl23
-rw-r--r--src/rabbit_amqqueue_process.erl106
-rw-r--r--src/rabbit_binary_generator.erl9
-rw-r--r--src/rabbit_channel.erl177
-rw-r--r--src/rabbit_invariable_queue.erl82
-rw-r--r--src/rabbit_misc.erl12
-rw-r--r--src/rabbit_plugin_activator.erl1
-rw-r--r--src/rabbit_reader.erl39
-rw-r--r--src/rabbit_reader_queue_collector.erl108
-rw-r--r--src/rabbit_tests.erl20
-rw-r--r--src/tcp_acceptor.erl7
20 files changed, 439 insertions, 248 deletions
diff --git a/Makefile b/Makefile
index 19eb66e8ee..f9ceeb833f 100644
--- a/Makefile
+++ b/Makefile
@@ -56,7 +56,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME)
SIBLING_CODEGEN_DIR=../rabbitmq-codegen/
AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen)
-AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.9.1.json
+AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.9.1.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
@@ -81,11 +81,11 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
$(EBIN_DIR)/%.beam:
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
-$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
- $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@
+$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
+ $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@
-$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
- $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@
+$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
+ $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@
dialyze: $(BEAM_TARGETS) $(BASIC_PLT)
$(ERL_EBIN) -eval \
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 5e2668c1a6..a2038cf0e9 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -271,7 +271,7 @@
<variablelist>
<varlistentry>
- <term><cmdsynopsis><command>cluster</command> <arg choice="req"><replaceable>clusternode</replaceable></arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index c2dad74475..d4327980be 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -51,7 +51,8 @@
-record(exchange, {name, type, durable, arguments}).
--record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid}).
+-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
+ arguments, pid}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 6926261f79..00066a15f7 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -107,6 +107,12 @@ if [ $1 = 0 ]; then
# Leave rabbitmq user and group
fi
+# Clean out plugin activation state, both on uninstall and upgrade
+rm -rf %{_rabbit_erllibdir}/priv
+for ext in rel script boot ; do
+ rm -f %{_rabbit_erllibdir}/ebin/rabbit.$ext
+done
+
%files -f ../%{name}.files
%defattr(-,root,root,-)
%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq
diff --git a/packaging/debs/Debian/debian/postrm.in b/packaging/debs/Debian/debian/postrm.in
index bfcf1f530e..5290de9b17 100644
--- a/packaging/debs/Debian/debian/postrm.in
+++ b/packaging/debs/Debian/debian/postrm.in
@@ -18,6 +18,13 @@ set -e
# for details, see http://www.debian.org/doc/debian-policy/ or
# the debian-policy package
+remove_plugin_traces() {
+ # Remove traces of plugins
+ rm -rf @RABBIT_LIB@/priv @RABBIT_LIB@/plugins
+ for ext in rel script boot ; do
+ rm -f @RABBIT_LIB@/ebin/rabbit.$ext
+ done
+}
case "$1" in
purge)
@@ -34,11 +41,7 @@ case "$1" in
if [ -d /etc/rabbitmq ]; then
rm -r /etc/rabbitmq
fi
- # Remove traces of plugins
- rm -rf @RABBIT_LIB@/priv @RABBIT_LIB@/plugins
- for ext in rel script boot ; do
- rm -f @RABBIT_LIB@/ebin/rabbit.$ext
- done
+ remove_plugin_traces
if getent passwd rabbitmq >/dev/null; then
# Stop epmd if run by the rabbitmq user
pkill -u rabbitmq epmd || :
@@ -50,7 +53,11 @@ case "$1" in
fi
;;
- remove|upgrade|failed-upgrade|abort-install|abort-upgrade|disappear)
+ remove|upgrade)
+ remove_plugin_traces
+ ;;
+
+ failed-upgrade|abort-install|abort-upgrade|disappear)
;;
*)
diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules
index 45659602f5..1916651426 100644
--- a/packaging/debs/Debian/debian/rules
+++ b/packaging/debs/Debian/debian/rules
@@ -13,7 +13,7 @@ DOCDIR=$(DEB_DESTDIR)usr/share/doc/rabbitmq-server/
install/rabbitmq-server::
mkdir -p $(DOCDIR)
- rm $(RABBIT_LIB)LICENSE*
+ rm $(RABBIT_LIB)LICENSE* $(RABBIT_LIB)INSTALL*
for script in rabbitmqctl rabbitmq-server rabbitmq-multi; do \
install -p -D -m 0755 debian/rabbitmq-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \
done
diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile
index 0ef7dd5e73..4ad4c30b2c 100644
--- a/packaging/macports/Makefile
+++ b/packaging/macports/Makefile
@@ -39,7 +39,7 @@ macports: dirs $(DEST)/Portfile
$(DEST)/files/rabbitmq-script-wrapper
cp patch-org.macports.rabbitmq-server.plist.diff $(DEST)/files
if [ -n "$(MACPORTS_USERHOST)" ] ; then \
- tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) lshift@macrabbit ' \
+ tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) $(MACPORTS_USERHOST) ' \
d="/tmp/mkportindex.$$$$" ; \
mkdir $$d \
&& cd $$d \
diff --git a/src/delegate.erl b/src/delegate.erl
index 12eb814f8f..98353453ff 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -63,7 +63,7 @@ start_link(Hash) ->
gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []).
invoke(Pid, Fun) when is_pid(Pid) ->
- [Res] = invoke_per_node([{node(Pid), [Pid]}], Fun),
+ [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun),
case Res of
{ok, Result, _} ->
Result;
@@ -83,7 +83,7 @@ invoke(Pids, Fun) when is_list(Pids) ->
invoke_per_node(split_delegate_per_node(Pids), Fun)).
invoke_no_result(Pid, Fun) when is_pid(Pid) ->
- invoke_no_result_per_node([{node(Pid), [Pid]}], Fun),
+ invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun),
ok;
invoke_no_result(Pids, Fun) when is_list(Pids) ->
@@ -99,32 +99,37 @@ internal_cast(Node, Thunk) when is_atom(Node) ->
gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}).
split_delegate_per_node(Pids) ->
- orddict:to_list(
- lists:foldl(
- fun (Pid, D) ->
- orddict:update(node(Pid),
- fun (Pids1) -> [Pid | Pids1] end,
- [Pid], D)
- end,
- orddict:new(), Pids)).
+ LocalNode = node(),
+ {Local, Remote} =
+ lists:foldl(
+ fun (Pid, {L, D}) ->
+ Node = node(Pid),
+ case Node of
+ LocalNode -> {[Pid|L], D};
+ _ -> {L, orddict:append(Node, Pid, D)}
+ end
+ end,
+ {[], orddict:new()}, Pids),
+ {Local, orddict:to_list(Remote)}.
-invoke_per_node([{Node, Pids}], Fun) when Node == node() ->
- safe_invoke(Pids, Fun);
invoke_per_node(NodePids, Fun) ->
lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)).
-invoke_no_result_per_node([{Node, Pids}], Fun) when Node == node() ->
- %% This is not actually async! However, in practice Fun will
- %% always be something that does a gen_server:cast or similar, so
- %% I don't think it's a problem unless someone misuses this
- %% function. Making this *actually* async would be painful as we
- %% can't spawn at this point or we break effect ordering.
- safe_invoke(Pids, Fun);
invoke_no_result_per_node(NodePids, Fun) ->
delegate_per_node(NodePids, Fun, fun internal_cast/2),
ok.
-delegate_per_node(NodePids, Fun, DelegateFun) ->
+delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) ->
+ %% In the case where DelegateFun is internal_cast, the safe_invoke
+ %% is not actually async! However, in practice Fun will always be
+ %% something that does a gen_server:cast or similar, so I don't
+ %% think it's a problem unless someone misuses this
+ %% function. Making this *actually* async would be painful as we
+ %% can't spawn at this point or we break effect ordering.
+ [safe_invoke(LocalPids, Fun)|
+ delegate_per_remote_node(NodePids, Fun, DelegateFun)].
+
+delegate_per_remote_node(NodePids, Fun, DelegateFun) ->
Self = self(),
%% Note that this is unsafe if the Fun requires reentrancy to the
%% local_server. I.e. if self() == local_server(Node) then we'll
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 525558d88f..09a190148f 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -299,6 +299,18 @@ run_boot_step({StepName, Attributes}) ->
ok
end.
+module_attributes(Module) ->
+ case catch Module:module_info(attributes) of
+ {'EXIT', {undef, [{Module, module_info, _} | _]}} ->
+ io:format("WARNING: module ~p not found, so not scanned for boot steps.~n",
+ [Module]),
+ [];
+ {'EXIT', Reason} ->
+ exit(Reason);
+ V ->
+ V
+ end.
+
boot_steps() ->
AllApps = [App || {App, _, _} <- application:loaded_applications()],
Modules = lists:usort(
@@ -310,7 +322,7 @@ boot_steps() ->
lists:flatmap(fun (Module) ->
[{StepName, Attributes}
|| {rabbit_boot_step, [{StepName, Attributes}]}
- <- Module:module_info(attributes)]
+ <- module_attributes(Module)]
end, Modules),
sort_boot_steps(UnsortedSteps).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b237be8a48..08241027e5 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -65,8 +65,8 @@
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-spec(start/0 :: () -> 'ok').
--spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), maybe(pid())) ->
- amqqueue()).
+-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(),
+ maybe(pid())) -> amqqueue()).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
@@ -101,8 +101,7 @@
-spec(basic_consume/7 ::
(amqqueue(), boolean(), pid(), pid() | 'undefined', ctag(),
boolean(), any()) ->
- 'ok' | {'error', 'queue_owned_by_another_connection' |
- 'exclusive_consume_unavailable'}).
+ 'ok' | {'error', 'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
@@ -145,7 +144,7 @@ find_durable_queues() ->
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
[Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q].
-
+
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
Q = start_queue_process(#amqqueue{name = QueueName,
durable = Durable,
@@ -320,10 +319,12 @@ flush_all(QPids, ChPid) ->
delegate:invoke_no_result(
QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end).
-internal_delete2(QueueName) ->
+internal_delete1(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
ok = mnesia:delete({rabbit_durable_queue, QueueName}),
- %% this is last because it returns a post-transaction callback
+ %% we want to execute some things, as
+ %% decided by rabbit_exchange, after the
+ %% transaction.
rabbit_exchange:delete_queue_bindings(QueueName).
internal_delete(QueueName) ->
@@ -332,14 +333,10 @@ internal_delete(QueueName) ->
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [_] -> internal_delete2(QueueName)
+ [_] -> internal_delete1(QueueName)
end
end) of
- Err = {error, _} ->
- Err;
- %% we want to execute some things, as
- %% decided by rabbit_exchange, after the
- %% transaction.
+ Err = {error, _} -> Err;
PostHook ->
PostHook(),
ok
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ff1b8f1baa..e7c926643f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -132,6 +132,23 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+declare(Recover, From,
+ State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
+ backing_queue = BQ, backing_queue_state = undefined}) ->
+ case rabbit_amqqueue:internal_declare(Q, Recover) of
+ not_found -> {stop, normal, not_found, State};
+ Q -> gen_server2:reply(From, Q),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use,
+ [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue,
+ set_ram_duration_target, [self()]}),
+ BQS = BQ:init(QName, IsDurable, Recover),
+ noreply(State#q{backing_queue_state = BQS});
+ Q1 -> {stop, normal, Q1, State}
+ end.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
@@ -484,8 +501,8 @@ i(pid, _) ->
self();
i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) ->
'';
-i(owner_pid, #q{q = #amqqueue{exclusive_owner = ReaderPid}}) ->
- ReaderPid;
+i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) ->
+ ExclusiveOwner;
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
'';
i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
@@ -514,50 +531,24 @@ i(Item, _) ->
%---------------------------------------------------------------------------
handle_call({init, Recover}, From,
- State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable,
- exclusive_owner = ExclusiveOwner},
- backing_queue = BQ, backing_queue_state = undefined}) ->
- Declare =
- fun() ->
- case rabbit_amqqueue:internal_declare(Q, Recover) of
- not_found ->
- {stop, normal, not_found, State};
- Q ->
- gen_server2:reply(From, Q),
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use,
- [self()]),
- ok = rabbit_memory_monitor:register(
- self(), {rabbit_amqqueue,
- set_ram_duration_target, [self()]}),
- noreply(
- State#q{backing_queue_state =
- BQ:init(QName, IsDurable, Recover)});
- Q1 ->
- {stop, normal, Q1, State}
- end
- end,
+ State = #q{q = #amqqueue{exclusive_owner = none}}) ->
+ declare(Recover, From, State);
- case ExclusiveOwner of
- none ->
- Declare();
- Owner ->
- case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of
- true ->
- erlang:monitor(process, Owner),
- Declare();
- _ ->
- case Recover of
- true -> ok;
- _ -> rabbit_log:warning(
- "Queue ~p exclusive owner went away~n",
- [QName])
- end,
- %% Rely on terminate to delete the queue.
- {stop, normal, not_found,
- State#q{backing_queue_state =
- BQ:init(QName, IsDurable, Recover)}}
- end
+handle_call({init, Recover}, From,
+ State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
+ case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of
+ true -> erlang:monitor(process, Owner),
+ declare(Recover, From, State);
+ _ -> #q{q = #amqqueue{name = QName, durable = IsDurable},
+ backing_queue = BQ, backing_queue_state = undefined} = State,
+ case Recover of
+ true -> ok;
+ _ -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n", [QName])
+ end,
+ BQS = BQ:init(QName, IsDurable, Recover),
+ %% Rely on terminate to delete the queue.
+ {stop, normal, not_found, State#q{backing_queue_state = BQS}}
end;
handle_call(info, _From, State) ->
@@ -642,16 +633,15 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
ok ->
C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag,
- ack_required = not NoAck},
+ ack_required = not NoAck},
store_ch_record(C#cr{consumer_count = ConsumerCount +1,
limiter_pid = LimiterPid}),
- case ConsumerCount of
- 0 -> rabbit_limiter:register(LimiterPid, self());
- _ -> ok
- end,
- ExclusiveConsumer = case ExclusiveConsume of
- true -> {ChPid, ConsumerTag};
- false -> ExistingHolder
+ ok = case ConsumerCount of
+ 0 -> rabbit_limiter:register(LimiterPid, self());
+ _ -> ok
+ end,
+ ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> ExistingHolder
end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
@@ -741,7 +731,6 @@ handle_call({requeue, AckTags, ChPid}, _From, State) ->
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
-
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
@@ -817,8 +806,13 @@ handle_cast({set_maximum_since_use, Age}, State) ->
noreply(State).
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
- State = #q{q= #amqqueue{ exclusive_owner = DownPid}}) ->
- %% Exclusively owned queues must disappear with their owner.
+ State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
+ %% Exclusively owned queues must disappear with their owner. In
+ %% the case of clean shutdown we delete the queue synchronously in
+ %% the reader - although not required by the spec this seems to
+ %% match what people expect (see bug 21824). However we need this
+ %% monitor-and-async- delete in case the connection goes away
+ %% unexpectedly.
{stop, normal, State};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index ed84373585..27a1275a31 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -118,10 +118,11 @@ build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc,
[Frag | Frags], BodyPayloadMax, ChannelInt) ->
Size = size(Frag),
{NewFragSizeRem, NewFragAcc, NewFrags} =
- case Size =< FragSizeRem of
- true -> {FragSizeRem - Size, [Frag | FragAcc], Frags};
- false -> <<Head:FragSizeRem/binary, Tail/binary>> = Frag,
- {0, [Head | FragAcc], [Tail | Frags]}
+ if Size == 0 -> {FragSizeRem, FragAcc, Frags};
+ Size =< FragSizeRem -> {FragSizeRem - Size, [Frag | FragAcc], Frags};
+ true -> <<Head:FragSizeRem/binary, Tail/binary>> =
+ Frag,
+ {0, [Head | FragAcc], [Tail | Frags]}
end,
build_content_frames(SizeAcc, FramesAcc, NewFragSizeRem, NewFragAcc,
NewFrags, BodyPayloadMax, ChannelInt).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e29abb8544..57d29d5ed9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,7 +35,7 @@
-behaviour(gen_server2).
--export([start_link/5, do/2, do/3, shutdown/1]).
+-export([start_link/6, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
@@ -46,7 +46,7 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking}).
+ consumer_mapping, blocking, queue_collector_pid}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -66,8 +66,8 @@
-ifdef(use_specs).
--spec(start_link/5 ::
- (channel_number(), pid(), pid(), username(), vhost()) -> pid()).
+-spec(start_link/6 ::
+ (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()).
-spec(do/2 :: (pid(), amqp_method()) -> 'ok').
-spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
@@ -86,10 +86,10 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, Username, VHost) ->
+start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) ->
{ok, Pid} = gen_server2:start_link(
?MODULE, [Channel, ReaderPid, WriterPid,
- Username, VHost], []),
+ Username, VHost, CollectorPid], []),
Pid.
do(Pid, Method) ->
@@ -135,10 +135,9 @@ info_all(Items) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
+init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
process_flag(trap_exit, true),
link(WriterPid),
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
ok = pg_local:join(rabbit_channels, self()),
{ok, #ch{state = starting,
channel = Channel,
@@ -154,7 +153,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
virtual_host = VHost,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
- blocking = dict:new()},
+ blocking = dict:new(),
+ queue_collector_pid = CollectorPid},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -299,13 +299,20 @@ check_write_permitted(Resource, #ch{ username = Username}) ->
check_read_permitted(Resource, #ch{ username = Username}) ->
check_resource_access(Username, Resource, read).
-check_queue_exclusivity(ReaderPid, Q) ->
- case Q of
- #amqqueue{ exclusive_owner = none} -> Q;
- #amqqueue{ exclusive_owner = ReaderPid } -> Q;
- _ -> rabbit_misc:protocol_error(resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(Q#amqqueue.name)])
+with_exclusive_access_or_die(QName, ReaderPid, F) ->
+ case rabbit_amqqueue:with_or_die(
+ QName, fun (Q = #amqqueue{exclusive_owner = Owner})
+ when Owner =:= none orelse Owner =:= ReaderPid ->
+ F(Q);
+ (_) ->
+ {error, wrong_exclusive_owner}
+ end) of
+ {error, wrong_exclusive_owner} ->
+ rabbit_misc:protocol_error(
+ resource_locked, "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QName)]);
+ Other ->
+ Other
end.
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
@@ -362,6 +369,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
{reply, #'channel.open_ok'{}, State#ch{state = running}};
handle_method(#'channel.open'{}, _, _State) ->
@@ -446,10 +454,10 @@ handle_method(#'basic.get'{queue = QueueNameBin,
next_tag = DeliveryTag }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- case rabbit_amqqueue:with_or_die(
+ case with_exclusive_access_or_die(
QueueName,
+ ReaderPid,
fun (Q) ->
- check_queue_exclusivity(ReaderPid, Q),
rabbit_amqqueue:basic_get(Q, self(), NoAck)
end) of
{ok, MessageCount,
@@ -493,10 +501,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
%% In order to ensure that the consume_ok gets sent before
%% any messages are sent to the consumer, we get the queue
%% process to send the consume_ok on our behalf.
- case rabbit_amqqueue:with_or_die(
- QueueName,
+ case with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) ->
- check_queue_exclusivity(ReaderPid, Q),
rabbit_amqqueue:basic_consume(
Q, NoAck, self(), LimiterPid,
ActualConsumerTag, ExclusiveConsume,
@@ -684,46 +691,53 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
-handle_method(#'queue.declare'{queue = QueueNameBin,
- passive = false,
- durable = Durable,
- exclusive = ExclusiveDeclare,
+handle_method(#'queue.declare'{queue = QueueNameBin,
+ passive = false,
+ durable = Durable,
+ exclusive = ExclusiveDeclare,
auto_delete = AutoDelete,
- nowait = NoWait,
- arguments = Args},
- _, State = #ch { virtual_host = VHostPath,
- reader_pid = ReaderPid }) ->
+ nowait = NoWait,
+ arguments = Args},
+ _, State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid,
+ queue_collector_pid = CollectorPid}) ->
Owner = case ExclusiveDeclare of
true -> ReaderPid;
false -> none
end,
%% We use this in both branches, because queue_declare may yet return an
%% existing queue.
- Finish =
- fun(Q) ->
- case Q of
- %% "equivalent" rule. NB: we don't pay attention to
- %% anything in the arguments table, so for the sake of the
- %% "equivalent" rule, all tables of arguments are
- %% semantically equivalant.
- Matched = #amqqueue{name = QueueName,
- durable = Durable, %% i.e., as supplied
- exclusive_owner = Owner,
- auto_delete = AutoDelete %% i.e,. as supplied
- } ->
- check_configure_permitted(QueueName, State),
- Matched;
- %% exclusivity trumps non-equivalence arbitrarily
- #amqqueue{name = QueueName, exclusive_owner = ExclusiveOwner}
- when ExclusiveOwner =/= Owner ->
- rabbit_misc:protocol_error(resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(QueueName)]);
- #amqqueue{name = QueueName} ->
- rabbit_misc:protocol_error(channel_error,
- "parameters for ~s not equivalent",
- [rabbit_misc:rs(QueueName)])
- end
+ Finish =
+ fun (#amqqueue{name = QueueName, exclusive_owner = Owner1,
+ durable = Durable1, auto_delete = AutoDelete1} = Q)
+ %% "equivalent" rule. NB: we don't pay attention to
+ %% anything in the arguments table, so for the sake of the
+ %% "equivalent" rule, all tables of arguments are
+ %% semantically equivalant.
+ when Owner =:= Owner1,
+ Durable =:= Durable1,
+ AutoDelete =:= AutoDelete1 ->
+ check_configure_permitted(QueueName, State),
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as the
+ %% connection shuts down.
+ case Owner of
+ none -> ok;
+ _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
+ end,
+ Q;
+ (#amqqueue{name = QueueName, exclusive_owner = Owner1})
+ when Owner =:= Owner1 ->
+ rabbit_misc:protocol_error(channel_error,
+ "parameters for ~s not equivalent",
+ [rabbit_misc:rs(QueueName)]);
+ (#amqqueue{name = QueueName}) ->
+ %% exclusivity trumps non-equivalence arbitrarily
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QueueName)])
end,
Q = case rabbit_amqqueue:with(
rabbit_misc:r(VHostPath, queue, QueueNameBin),
@@ -735,37 +749,33 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- check_configure_permitted(QueueName, State),
- Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner));
- Found -> Found
+ Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
+ Args, Owner));
+ #amqqueue{} = Other ->
+ Other
end,
return_queue_declare_ok(State, NoWait, Q);
-handle_method(#'queue.declare'{queue = QueueNameBin,
+handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
- nowait = NoWait},
- _, State = #ch{ virtual_host = VHostPath,
- reader_pid = ReaderPid }) ->
+ nowait = NoWait},
+ _, State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- CheckExclusive = fun(Q) -> check_queue_exclusivity(ReaderPid, Q) end,
- Q = rabbit_amqqueue:with_or_die(QueueName, CheckExclusive),
+ Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun (Q) -> Q end),
return_queue_declare_ok(State, NoWait, Q);
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty,
- nowait = NoWait
- },
- _, State = #ch{ reader_pid = ReaderPid }) ->
+ nowait = NoWait},
+ _, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
- case rabbit_amqqueue:with_or_die(
- QueueName,
- fun (Q) ->
- check_queue_exclusivity(ReaderPid, Q),
- rabbit_amqqueue:delete(Q, IfUnused, IfEmpty)
- end) of
+ case with_exclusive_access_or_die(
+ QueueName, ReaderPid,
+ fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
rabbit_misc:protocol_error(
precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]);
@@ -774,8 +784,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]);
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
- #'queue.delete_ok'{
- message_count = PurgedMessageCount})
+ #'queue.delete_ok'{message_count = PurgedMessageCount})
end;
handle_method(#'queue.bind'{queue = QueueNameBin,
@@ -797,15 +806,12 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
- _, State = #ch{ reader_pid = ReaderPid }) ->
+ _, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die(
- QueueName,
- fun (Q) ->
- check_queue_exclusivity(ReaderPid, Q),
- rabbit_amqqueue:purge(Q)
- end),
+ {ok, PurgedMessageCount} = with_exclusive_access_or_die(
+ QueueName, ReaderPid,
+ fun (Q) -> rabbit_amqqueue:purge(Q) end),
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
@@ -881,8 +887,13 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
- CheckExclusive = fun(_X, Q) -> check_queue_exclusivity(ReaderPid, Q) end,
- case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, CheckExclusive) of
+ CheckExclusive =
+ fun(_X, Q) ->
+ with_exclusive_access_or_die(Q#amqqueue.name,
+ ReaderPid, fun(_Q1)-> ok end)
+ end,
+ case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments,
+ CheckExclusive) of
{error, exchange_not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, queue_not_found} ->
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index b4fd91560f..a7ca20c80b 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -43,7 +43,7 @@
-include("rabbit.hrl").
--record(iv_state, { queue, qname, len, pending_ack }).
+-record(iv_state, { queue, qname, durable, len, pending_ack }).
-record(tx, { pending_messages, pending_acks, is_persistent }).
-ifdef(use_specs).
@@ -66,18 +66,23 @@ init(QName, IsDurable, Recover) ->
true -> rabbit_persister:queue_content(QName);
false -> []
end),
- #iv_state { queue = Q, qname = QName, len = queue:len(Q),
+ #iv_state { queue = Q,
+ qname = QName,
+ durable = IsDurable,
+ len = queue:len(Q),
pending_ack = dict:new() }.
terminate(State) ->
State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }.
-delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA }) ->
- ok = persist_acks(none, QName, dict:fetch_keys(PA), PA),
+delete_and_terminate(State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
+ ok = persist_acks(QName, IsDurable, none, dict:fetch_keys(PA), PA),
{_PLen, State1} = purge(State),
terminate(State1).
-purge(State = #iv_state { len = Len, queue = Q, qname = QName }) ->
+purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
+ len = Len }) ->
%% We do not purge messages pending acks.
{AckTags, PA} =
rabbit_misc:queue_fold(
@@ -85,57 +90,63 @@ purge(State = #iv_state { len = Len, queue = Q, qname = QName }) ->
Acc;
({Msg = #basic_message { guid = Guid }, IsDelivered},
{AckTagsN, PAN}) ->
- ok = persist_delivery(QName, Msg, IsDelivered),
+ ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
{[Guid | AckTagsN], dict:store(Guid, Msg, PAN)}
end, {[], dict:new()}, Q),
- ok = persist_acks(none, QName, AckTags, PA),
+ ok = persist_acks(QName, IsDurable, none, AckTags, PA),
{Len, State #iv_state { len = 0, queue = queue:new() }}.
-publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len }) ->
- ok = persist_message(none, QName, Msg),
+publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
+ len = Len }) ->
+ ok = persist_message(QName, IsDurable, none, Msg),
State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }.
publish_delivered(false, _Msg, State) ->
{blank_ack, State};
publish_delivered(true, Msg = #basic_message { guid = Guid },
- State = #iv_state { qname = QName, len = 0,
- pending_ack = PA }) ->
- ok = persist_message(none, QName, Msg),
- ok = persist_delivery(QName, Msg, false),
+ State = #iv_state { qname = QName, durable = IsDurable,
+ len = 0, pending_ack = PA }) ->
+ ok = persist_message(QName, IsDurable, none, Msg),
+ ok = persist_delivery(QName, IsDurable, false, Msg),
{Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}.
fetch(_AckRequired, State = #iv_state { len = 0 }) ->
{empty, State};
-fetch(AckRequired, State = #iv_state { queue = Q, qname = QName, len = Len,
+fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName,
+ durable = IsDurable,
pending_ack = PA }) ->
{{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} =
queue:out(Q),
Len1 = Len - 1,
- ok = persist_delivery(QName, Msg, IsDelivered),
+ ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
PA1 = dict:store(Guid, Msg, PA),
{AckTag, PA2} = case AckRequired of
true -> {Guid, PA1};
- false -> ok = persist_acks(none, QName, [Guid], PA1),
+ false -> ok = persist_acks(QName, IsDurable, none,
+ [Guid], PA1),
{blank_ack, PA}
end,
{{Msg, IsDelivered, AckTag, Len1},
State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}.
-ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA }) ->
- ok = persist_acks(none, QName, AckTags, PA),
+ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
+ ok = persist_acks(QName, IsDurable, none, AckTags, PA),
PA1 = remove_acks(AckTags, PA),
State #iv_state { pending_ack = PA1 }.
-tx_publish(Txn, Msg, State = #iv_state { qname = QName }) ->
+tx_publish(Txn, Msg, State = #iv_state { qname = QName,
+ durable = IsDurable }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
- ok = persist_message(Txn, QName, Msg),
+ ok = persist_message(QName, IsDurable, Txn, Msg),
State.
-tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA }) ->
+tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }),
- ok = persist_acks(Txn, QName, AckTags, PA),
+ ok = persist_acks(QName, IsDurable, Txn, AckTags, PA),
State.
tx_rollback(Txn, State = #iv_state { qname = QName }) ->
@@ -228,32 +239,33 @@ do_if_persistent(F, Txn, QName) ->
%%----------------------------------------------------------------------------
-persist_message(_Txn, _QName, #basic_message { is_persistent = false }) ->
- ok;
-persist_message(Txn, QName, Msg) ->
+persist_message(QName, true, Txn, Msg = #basic_message {
+ is_persistent = true }) ->
Msg1 = Msg #basic_message {
%% don't persist any recoverable decoded properties,
%% rebuild from properties_bin on restore
content = rabbit_binary_parser:clear_decoded_content(
Msg #basic_message.content)},
persist_work(Txn, QName,
- [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]).
+ [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]);
+persist_message(_QName, _IsDurable, _Txn, _Msg) ->
+ ok.
-persist_delivery(_QName, #basic_message { is_persistent = false },
- _IsDelivered) ->
- ok;
-persist_delivery(_QName, _Message, true) ->
- ok;
-persist_delivery(QName, #basic_message { guid = Guid }, _IsDelivered) ->
- persist_work(none, QName, [{deliver, {QName, Guid}}]).
+persist_delivery(QName, true, false, #basic_message { is_persistent = true,
+ guid = Guid }) ->
+ persist_work(none, QName, [{deliver, {QName, Guid}}]);
+persist_delivery(_QName, _IsDurable, _IsDelivered, _Msg) ->
+ ok.
-persist_acks(Txn, QName, AckTags, PA) ->
+persist_acks(QName, true, Txn, AckTags, PA) ->
persist_work(Txn, QName,
[{ack, {QName, Guid}} || Guid <- AckTags,
begin
{ok, Msg} = dict:find(Guid, PA),
Msg #basic_message.is_persistent
- end]).
+ end]);
+persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) ->
+ ok.
persist_work(_Txn,_QName, []) ->
ok;
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 723b818b41..9a911ab15d 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -537,18 +537,24 @@ pid_to_string(Pid) when is_pid(Pid) ->
%% inverse of above
string_to_pid(Str) ->
+ Err = {error, {invalid_pid_syntax, Str}},
%% The \ before the trailing $ is only there to keep emacs
%% font-lock from getting confused.
case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$",
[{capture,all_but_first,list}]) of
{match, [NodeStr, IdStr, SerStr]} ->
- %% turn the triple into a pid - see pid_to_string
- <<131,NodeEnc/binary>> = term_to_binary(list_to_atom(NodeStr)),
+ %% the NodeStr atom might be quoted, so we have to parse
+ %% it rather than doing a simple list_to_atom
+ NodeAtom = case erl_scan:string(NodeStr) of
+ {ok, [{atom, _, X}], _} -> X;
+ {error, _, _} -> throw(Err)
+ end,
+ <<131,NodeEnc/binary>> = term_to_binary(NodeAtom),
Id = list_to_integer(IdStr),
Ser = list_to_integer(SerStr),
binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>);
nomatch ->
- throw({error, {invalid_pid_syntax, Str}})
+ throw(Err)
end.
version_compare(A, B, lte) ->
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index 274981efed..ef3c5cc250 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -108,6 +108,7 @@ start() ->
WApp == stdlib;
WApp == kernel;
WApp == sasl;
+ WApp == crypto;
WApp == os_mon -> false;
_ -> true
end]),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index d9e6de0553..8e7cd39f8a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -52,13 +52,15 @@
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
+-define(SILENT_CLOSE_DELAY, 3).
%% set to zero once QPid fix their negotiation
-define(FRAME_MAX, 131072).
-define(CHANNEL_MAX, 0).
%---------------------------------------------------------------------------
--record(v1, {sock, connection, callback, recv_ref, connection_state}).
+-record(v1, {sock, connection, callback, recv_ref, connection_state,
+ queue_collector}).
-define(INFO_KEYS,
[pid, address, port, peer_address, peer_port,
@@ -236,6 +238,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
ProfilingValue = setup_profiling(),
+ {ok, Collector} = rabbit_reader_queue_collector:start_link(),
try
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
@@ -247,7 +250,8 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
client_properties = none},
callback = uninitialized_callback,
recv_ref = none,
- connection_state = pre_init},
+ connection_state = pre_init,
+ queue_collector = Collector},
handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
@@ -265,7 +269,9 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%% output to be sent, which results in unnecessary delays.
%%
%% gen_tcp:close(ClientSock),
- teardown_profiling(ProfilingValue)
+ teardown_profiling(ProfilingValue),
+ rabbit_reader_queue_collector:shutdown(Collector),
+ rabbit_misc:unlink_and_capture_exit(Collector)
end,
done.
@@ -428,11 +434,17 @@ wait_for_channel_termination(N, TimerRef) ->
exit(channel_termination_timeout)
end.
-maybe_close(State = #v1{connection_state = closing}) ->
+maybe_close(State = #v1{connection_state = closing,
+ queue_collector = Collector}) ->
case all_channels() of
- [] -> ok = send_on_channel0(
- State#v1.sock, #'connection.close_ok'{}),
- close_connection(State);
+ [] ->
+ %% Spec says "Exclusive queues may only be accessed by the current
+ %% connection, and are deleted when that connection closes."
+ %% This does not strictly imply synchrony, but in practice it seems
+ %% to be what people assume.
+ rabbit_reader_queue_collector:delete_all(Collector),
+ ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ close_connection(State);
_ -> State
end;
maybe_close(State) ->
@@ -576,7 +588,11 @@ handle_method0(MethodName, FieldsBin, State) ->
end,
case State#v1.connection_state of
running -> send_exception(State, 0, CompleteReason);
- Other -> throw({channel0_error, Other, CompleteReason})
+ %% We don't trust the client at this point - force
+ %% them to wait for a bit so they can't DOS us with
+ %% repeated failed logins etc.
+ Other -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({channel0_error, Other, CompleteReason})
end
end.
@@ -699,15 +715,16 @@ i(Item, #v1{}) ->
%%--------------------------------------------------------------------------
-send_to_new_channel(Channel, AnalyzedFrame, State) ->
+send_to_new_channel(Channel, AnalyzedFrame,
+ State = #v1{queue_collector = Collector}) ->
#v1{sock = Sock, connection = #connection{
frame_max = FrameMax,
user = #user{username = Username},
vhost = VHost}} = State,
WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/5,
- [Channel, self(), WriterPid, Username, VHost]),
+ fun rabbit_channel:start_link/6,
+ [Channel, self(), WriterPid, Username, VHost, Collector]),
put({channel, Channel}, {chpid, ChPid}),
put({chpid, ChPid}, {channel, Channel}),
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
diff --git a/src/rabbit_reader_queue_collector.erl b/src/rabbit_reader_queue_collector.erl
new file mode 100644
index 0000000000..841549e98f
--- /dev/null
+++ b/src/rabbit_reader_queue_collector.erl
@@ -0,0 +1,108 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_reader_queue_collector).
+
+-behaviour(gen_server).
+
+-export([start_link/0, register_exclusive_queue/2, delete_all/1, shutdown/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {exclusive_queues}).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()}).
+-spec(register_exclusive_queue/2 :: (pid(), amqqueue()) -> 'ok').
+-spec(delete_all/1 :: (pid()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link(?MODULE, [], []).
+
+register_exclusive_queue(CollectorPid, Q) ->
+ gen_server:call(CollectorPid, {register_exclusive_queue, Q}, infinity).
+
+delete_all(CollectorPid) ->
+ gen_server:call(CollectorPid, delete_all, infinity).
+
+shutdown(CollectorPid) ->
+ gen_server:call(CollectorPid, shutdown, infinity).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, #state{exclusive_queues = dict:new()}}.
+
+%%--------------------------------------------------------------------------
+
+handle_call({register_exclusive_queue, Q}, _From,
+ State = #state{exclusive_queues = Queues}) ->
+ MonitorRef = erlang:monitor(process, Q#amqqueue.pid),
+ {reply, ok,
+ State#state{exclusive_queues = dict:store(MonitorRef, Q, Queues)}};
+
+handle_call(delete_all, _From,
+ State = #state{exclusive_queues = ExclusiveQueues}) ->
+ [rabbit_misc:with_exit_handler(
+ fun() -> ok end,
+ fun() ->
+ erlang:demonitor(MonitorRef),
+ rabbit_amqqueue:delete(Q, false, false)
+ end)
+ || {MonitorRef, Q} <- dict:to_list(ExclusiveQueues)],
+ {reply, ok, State};
+
+handle_call(shutdown, _From, State) ->
+ {stop, normal, ok, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
+ State = #state{exclusive_queues = ExclusiveQueues}) ->
+ {noreply, State#state{exclusive_queues =
+ dict:erase(MonitorRef, ExclusiveQueues)}}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 08b4cc75c8..5567cdbe36 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -788,7 +788,8 @@ test_user_management() ->
test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
- Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>),
+ Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
+ self()),
[Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, Name),
false, false, [], none) ||
@@ -894,10 +895,11 @@ test_delegates_async(SecondaryNode) ->
passed.
-make_responder(FMsg) ->
+make_responder(FMsg) -> make_responder(FMsg, timeout).
+make_responder(FMsg, Throw) ->
fun() ->
receive Msg -> FMsg(Msg)
- after 1000 -> throw(timeout)
+ after 1000 -> throw(Throw)
end
end.
@@ -931,17 +933,21 @@ test_delegates_sync(SecondaryNode) ->
gen_server:reply(From, response)
end),
+ BadResponder = make_responder(fun({'$gen_call', From, invoked}) ->
+ gen_server:reply(From, response)
+ end, bad_responder_died),
+
response = delegate:invoke(spawn(Responder), Sender),
response = delegate:invoke(spawn(SecondaryNode, Responder), Sender),
- must_exit(fun() -> delegate:invoke(spawn(Responder), BadSender) end),
+ must_exit(fun() -> delegate:invoke(spawn(BadResponder), BadSender) end),
must_exit(fun() ->
- delegate:invoke(spawn(SecondaryNode, Responder), BadSender) end),
+ delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end),
LocalGoodPids = spawn_responders(node(), Responder, 2),
RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2),
- LocalBadPids = spawn_responders(node(), Responder, 2),
- RemoteBadPids = spawn_responders(SecondaryNode, Responder, 2),
+ LocalBadPids = spawn_responders(node(), BadResponder, 2),
+ RemoteBadPids = spawn_responders(SecondaryNode, BadResponder, 2),
{GoodRes, []} = delegate:invoke(LocalGoodPids ++ RemoteGoodPids, Sender),
true = lists:all(fun ({_, response}) -> true end, GoodRes),
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 3b23daa5c1..cc4982c9cb 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -75,6 +75,13 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
[inet_parse:ntoa(Address), Port,
inet_parse:ntoa(PeerAddress), PeerPort]),
+ %% In the event that somebody floods us with connections we can spew
+ %% the above message at error_logger faster than it can keep up.
+ %% So error_logger's mailbox grows unbounded until we eat all the
+ %% memory available and crash. So here's a meaningless synchronous call
+ %% to the underlying gen_event mechanism - when it returns the mailbox
+ %% is drained.
+ gen_event:which_handlers(error_logger),
%% handle
file_handle_cache:release_on_death(apply(M, F, A ++ [Sock]))
catch {inet_error, Reason} ->