summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rabbitmqctl.1.xml2
-rw-r--r--include/rabbit.hrl14
-rw-r--r--src/delegate.erl45
-rw-r--r--src/rabbit_amqqueue.erl43
-rw-r--r--src/rabbit_amqqueue_process.erl190
-rw-r--r--src/rabbit_channel.erl138
-rw-r--r--src/rabbit_invariable_queue.erl82
-rw-r--r--src/rabbit_misc.erl12
-rw-r--r--src/rabbit_reader.erl36
-rw-r--r--src/rabbit_reader_queue_collector.erl108
-rw-r--r--src/rabbit_tests.erl26
11 files changed, 415 insertions, 281 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 5e2668c1a6..a2038cf0e9 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -271,7 +271,7 @@
<variablelist>
<varlistentry>
- <term><cmdsynopsis><command>cluster</command> <arg choice="req"><replaceable>clusternode</replaceable></arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 145f6104ae..0d75310b11 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -51,7 +51,8 @@
-record(exchange, {name, type, durable, auto_delete, arguments}).
--record(amqqueue, {name, durable, auto_delete, arguments, pid}).
+-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
+ arguments, pid}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
@@ -104,11 +105,12 @@
write :: regexp(),
read :: regexp()}).
-type(amqqueue() ::
- #amqqueue{name :: queue_name(),
- durable :: boolean(),
- auto_delete :: boolean(),
- arguments :: amqp_table(),
- pid :: maybe(pid())}).
+ #amqqueue{name :: queue_name(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
+ exclusive_owner :: maybe(pid()),
+ arguments :: amqp_table(),
+ pid :: maybe(pid())}).
-type(exchange() ::
#exchange{name :: exchange_name(),
type :: exchange_type(),
diff --git a/src/delegate.erl b/src/delegate.erl
index 12eb814f8f..98353453ff 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -63,7 +63,7 @@ start_link(Hash) ->
gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []).
invoke(Pid, Fun) when is_pid(Pid) ->
- [Res] = invoke_per_node([{node(Pid), [Pid]}], Fun),
+ [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun),
case Res of
{ok, Result, _} ->
Result;
@@ -83,7 +83,7 @@ invoke(Pids, Fun) when is_list(Pids) ->
invoke_per_node(split_delegate_per_node(Pids), Fun)).
invoke_no_result(Pid, Fun) when is_pid(Pid) ->
- invoke_no_result_per_node([{node(Pid), [Pid]}], Fun),
+ invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun),
ok;
invoke_no_result(Pids, Fun) when is_list(Pids) ->
@@ -99,32 +99,37 @@ internal_cast(Node, Thunk) when is_atom(Node) ->
gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}).
split_delegate_per_node(Pids) ->
- orddict:to_list(
- lists:foldl(
- fun (Pid, D) ->
- orddict:update(node(Pid),
- fun (Pids1) -> [Pid | Pids1] end,
- [Pid], D)
- end,
- orddict:new(), Pids)).
+ LocalNode = node(),
+ {Local, Remote} =
+ lists:foldl(
+ fun (Pid, {L, D}) ->
+ Node = node(Pid),
+ case Node of
+ LocalNode -> {[Pid|L], D};
+ _ -> {L, orddict:append(Node, Pid, D)}
+ end
+ end,
+ {[], orddict:new()}, Pids),
+ {Local, orddict:to_list(Remote)}.
-invoke_per_node([{Node, Pids}], Fun) when Node == node() ->
- safe_invoke(Pids, Fun);
invoke_per_node(NodePids, Fun) ->
lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)).
-invoke_no_result_per_node([{Node, Pids}], Fun) when Node == node() ->
- %% This is not actually async! However, in practice Fun will
- %% always be something that does a gen_server:cast or similar, so
- %% I don't think it's a problem unless someone misuses this
- %% function. Making this *actually* async would be painful as we
- %% can't spawn at this point or we break effect ordering.
- safe_invoke(Pids, Fun);
invoke_no_result_per_node(NodePids, Fun) ->
delegate_per_node(NodePids, Fun, fun internal_cast/2),
ok.
-delegate_per_node(NodePids, Fun, DelegateFun) ->
+delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) ->
+ %% In the case where DelegateFun is internal_cast, the safe_invoke
+ %% is not actually async! However, in practice Fun will always be
+ %% something that does a gen_server:cast or similar, so I don't
+ %% think it's a problem unless someone misuses this
+ %% function. Making this *actually* async would be painful as we
+ %% can't spawn at this point or we break effect ordering.
+ [safe_invoke(LocalPids, Fun)|
+ delegate_per_remote_node(NodePids, Fun, DelegateFun)].
+
+delegate_per_remote_node(NodePids, Fun, DelegateFun) ->
Self = self(),
%% Note that this is unsafe if the Fun requires reentrancy to the
%% local_server. I.e. if self() == local_server(Node) then we'll
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7b88c45d26..483b5a93b7 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-module(rabbit_amqqueue).
--export([start/0, declare/4, delete/3, purge/1]).
+-export([start/0, declare/5, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
update_ram_duration/1, set_ram_duration_target/2,
@@ -41,8 +41,7 @@
stat/1, stat_all/0, deliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1]).
--export([claim_queue/2]).
--export([basic_get/3, basic_consume/8, basic_cancel/4]).
+-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -66,8 +65,8 @@
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-spec(start/0 :: () -> 'ok').
--spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) ->
- amqqueue()).
+-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(),
+ maybe(pid())) -> amqqueue()).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
@@ -97,14 +96,12 @@
-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> 'ok').
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
--spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
--spec(basic_consume/8 ::
- (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(),
+-spec(basic_consume/7 ::
+ (amqqueue(), boolean(), pid(), pid() | 'undefined', ctag(),
boolean(), any()) ->
- 'ok' | {'error', 'queue_owned_by_another_connection' |
- 'exclusive_consume_unavailable'}).
+ 'ok' | {'error', 'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
@@ -148,11 +145,12 @@ recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
[Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q].
-declare(QueueName, Durable, AutoDelete, Args) ->
+declare(QueueName, Durable, AutoDelete, Args, Owner) ->
Q = start_queue_process(#amqqueue{name = QueueName,
durable = Durable,
auto_delete = AutoDelete,
arguments = Args,
+ exclusive_owner = Owner,
pid = none}),
case gen_server2:call(Q#amqqueue.pid, {init, false}) of
not_found -> rabbit_misc:not_found(QueueName);
@@ -298,15 +296,12 @@ limit_all(QPids, ChPid, LimiterPid) ->
gen_server2:cast(QPid, {limit, ChPid, LimiterPid})
end).
-claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
- delegate_call(QPid, {claim_queue, ReaderPid}, infinity).
-
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- delegate_call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
+ delegate_call(QPid, {basic_consume, NoAck, ChPid,
LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
infinity).
@@ -324,19 +319,21 @@ flush_all(QPids, ChPid) ->
delegate:invoke_no_result(
QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end).
+internal_delete1(QueueName) ->
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ ok = mnesia:delete({rabbit_durable_queue, QueueName}),
+ %% we want to execute some things, as
+ %% decided by rabbit_exchange, after the
+ %% transaction.
+ rabbit_exchange:delete_queue_bindings(QueueName).
+
internal_delete(QueueName) ->
case
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [_] ->
- ok = mnesia:delete({rabbit_queue, QueueName}),
- ok = mnesia:delete({rabbit_durable_queue, QueueName}),
- %% we want to execute some things, as
- %% decided by rabbit_exchange, after the
- %% transaction.
- rabbit_exchange:delete_queue_bindings(QueueName)
+ [_] -> internal_delete1(QueueName)
end
end) of
Err = {error, _} -> Err;
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f12e1b70f8..3283cb6679 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -50,7 +50,6 @@
% Queue's state
-record(q, {q,
- owner,
exclusive_consumer,
has_had_consumers,
backing_queue,
@@ -104,7 +103,6 @@ init(Q) ->
{ok, BQ} = application:get_env(backing_queue_module),
{ok, #q{q = Q#amqqueue{pid = self()},
- owner = none,
exclusive_consumer = none,
has_had_consumers = false,
backing_queue = BQ,
@@ -134,6 +132,23 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+declare(Recover, From,
+ State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
+ backing_queue = BQ, backing_queue_state = undefined}) ->
+ case rabbit_amqqueue:internal_declare(Q, Recover) of
+ not_found -> {stop, normal, not_found, State};
+ Q -> gen_server2:reply(From, Q),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use,
+ [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue,
+ set_ram_duration_target, [self()]}),
+ BQS = BQ:init(QName, IsDurable, Recover),
+ noreply(State#q{backing_queue_state = BQS});
+ Q1 -> {stop, normal, Q1, State}
+ end.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
@@ -433,10 +448,6 @@ cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) ->
cancel_holder(_ChPid, _ConsumerTag, Holder) ->
Holder.
-check_queue_owner(none, _) -> ok;
-check_queue_owner({ReaderPid, _}, ReaderPid) -> ok;
-check_queue_owner({_, _}, _) -> mismatch.
-
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
check_exclusive_access(none, false, _State) ->
@@ -488,10 +499,10 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
i(pid, _) ->
self();
-i(owner_pid, #q{owner = none}) ->
+i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) ->
'';
-i(owner_pid, #q{owner = {ReaderPid, _MonitorRef}}) ->
- ReaderPid;
+i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) ->
+ ExclusiveOwner;
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
'';
i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
@@ -520,25 +531,24 @@ i(Item, _) ->
%---------------------------------------------------------------------------
handle_call({init, Recover}, From,
- State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined}) ->
- %% TODO: If we're exclusively owned && our owner isn't alive &&
- %% Recover then we should BQ:init and then {stop, normal,
- %% not_found, State}, relying on terminate to delete the queue.
- case rabbit_amqqueue:internal_declare(Q, Recover) of
- not_found ->
- {stop, normal, not_found, State};
- Q ->
- gen_server2:reply(From, Q),
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use, [self()]),
- ok = rabbit_memory_monitor:register(
- self(),
- {rabbit_amqqueue, set_ram_duration_target, [self()]}),
- noreply(State#q{backing_queue_state =
- BQ:init(QName, IsDurable, Recover)});
- Q1 ->
- {stop, normal, Q1, State}
+ State = #q{q = #amqqueue{exclusive_owner = none}}) ->
+ declare(Recover, From, State);
+
+handle_call({init, Recover}, From,
+ State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
+ case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of
+ true -> erlang:monitor(process, Owner),
+ declare(Recover, From, State);
+ _ -> #q{q = #amqqueue{name = QName, durable = IsDurable},
+ backing_queue = BQ, backing_queue_state = undefined} = State,
+ case Recover of
+ true -> ok;
+ _ -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n", [QName])
+ end,
+ BQS = BQ:init(QName, IsDurable, Recover),
+ %% Rely on terminate to delete the queue.
+ {stop, normal, not_found, State#q{backing_queue_state = BQS}}
end;
handle_call(info, _From, State) ->
@@ -613,51 +623,44 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply({ok, Remaining, Msg}, State#q{backing_queue_state = BQS1})
end;
-handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
+handle_call({basic_consume, NoAck, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg},
- _From, State = #q{owner = Owner,
- exclusive_consumer = ExistingHolder}) ->
- case check_queue_owner(Owner, ReaderPid) of
- mismatch ->
- reply({error, queue_owned_by_another_connection}, State);
+ _From, State = #q{exclusive_consumer = ExistingHolder}) ->
+ case check_exclusive_access(ExistingHolder, ExclusiveConsume,
+ State) of
+ in_use ->
+ reply({error, exclusive_consume_unavailable}, State);
ok ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume,
- State) of
- in_use ->
- reply({error, exclusive_consume_unavailable}, State);
- ok ->
- C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
- Consumer = #consumer{tag = ConsumerTag,
- ack_required = not NoAck},
- store_ch_record(C#cr{consumer_count = ConsumerCount +1,
- limiter_pid = LimiterPid}),
- case ConsumerCount of
- 0 -> ok = rabbit_limiter:register(LimiterPid, self());
- _ -> ok
- end,
- ExclusiveConsumer = case ExclusiveConsume of
- true -> {ChPid, ConsumerTag};
- false -> ExistingHolder
- end,
- State1 = State#q{has_had_consumers = true,
- exclusive_consumer = ExclusiveConsumer},
- ok = maybe_send_reply(ChPid, OkMsg),
- State2 =
- case is_ch_blocked(C) of
- true -> State1#q{
- blocked_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.blocked_consumers)};
- false -> run_message_queue(
- State1#q{
- active_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.active_consumers)})
- end,
- reply(ok, State2)
- end
+ C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
+ Consumer = #consumer{tag = ConsumerTag,
+ ack_required = not NoAck},
+ store_ch_record(C#cr{consumer_count = ConsumerCount +1,
+ limiter_pid = LimiterPid}),
+ ok = case ConsumerCount of
+ 0 -> rabbit_limiter:register(LimiterPid, self());
+ _ -> ok
+ end,
+ ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> ExistingHolder
+ end,
+ State1 = State#q{has_had_consumers = true,
+ exclusive_consumer = ExclusiveConsumer},
+ ok = maybe_send_reply(ChPid, OkMsg),
+ State2 =
+ case is_ch_blocked(C) of
+ true -> State1#q{
+ blocked_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.blocked_consumers)};
+ false -> run_message_queue(
+ State1#q{
+ active_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.active_consumers)})
+ end,
+ reply(ok, State2)
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
@@ -713,29 +716,6 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
{Count, BQS1} = BQ:purge(BQS),
reply({ok, Count}, State#q{backing_queue_state = BQS1});
-handle_call({claim_queue, ReaderPid}, _From,
- State = #q{owner = Owner, exclusive_consumer = Holder}) ->
- case Owner of
- none ->
- case check_exclusive_access(Holder, true, State) of
- in_use ->
- %% FIXME: Is this really the right answer? What if
- %% an active consumer's reader is actually the
- %% claiming pid? Should that be allowed? In order
- %% to check, we'd need to hold not just the ch
- %% pid for each consumer, but also its reader
- %% pid...
- reply(locked, State);
- ok ->
- MonitorRef = erlang:monitor(process, ReaderPid),
- reply(ok, State#q{owner = {ReaderPid, MonitorRef}})
- end;
- {ReaderPid, _MonitorRef} ->
- reply(ok, State);
- _ ->
- reply(locked, State)
- end;
-
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
@@ -825,19 +805,15 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State).
-handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
- State = #q{owner = {DownPid, MonitorRef}}) ->
- %% We know here that there are no consumers on this queue that are
- %% owned by other pids than the one that just went down, so since
- %% exclusive in some sense implies autodelete, we delete the queue
- %% here. The other way of implementing the "exclusive implies
- %% autodelete" feature is to actually set autodelete when an
- %% exclusive declaration is seen, but this has the problem that
- %% the python tests rely on the queue not going away after a
- %% basic.cancel when the queue was declared exclusive and
- %% nonautodelete.
- NewState = State#q{owner = none},
- {stop, normal, NewState};
+handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
+ State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
+ %% Exclusively owned queues must disappear with their owner. In
+ %% the case of clean shutdown we delete the queue synchronously in
+ %% the reader - although not required by the spec this seems to
+ %% match what people expect (see bug 21824). However we need this
+ %% monitor-and-async- delete in case the connection goes away
+ %% unexpectedly.
+ {stop, normal, State};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
{ok, NewState} -> noreply(NewState);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a48db9c8b3..31bb54c0f4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,7 +35,7 @@
-behaviour(gen_server2).
--export([start_link/5, do/2, do/3, shutdown/1]).
+-export([start_link/6, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
@@ -46,7 +46,7 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking}).
+ consumer_mapping, blocking, queue_collector_pid}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -66,8 +66,8 @@
-ifdef(use_specs).
--spec(start_link/5 ::
- (channel_number(), pid(), pid(), username(), vhost()) -> pid()).
+-spec(start_link/6 ::
+ (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()).
-spec(do/2 :: (pid(), amqp_method()) -> 'ok').
-spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
@@ -86,10 +86,10 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, Username, VHost) ->
+start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) ->
{ok, Pid} = gen_server2:start_link(
?MODULE, [Channel, ReaderPid, WriterPid,
- Username, VHost], []),
+ Username, VHost, CollectorPid], []),
Pid.
do(Pid, Method) ->
@@ -135,7 +135,7 @@ info_all(Items) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
+init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
process_flag(trap_exit, true),
link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
@@ -153,7 +153,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
virtual_host = VHost,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
- blocking = dict:new()},
+ blocking = dict:new(),
+ queue_collector_pid = CollectorPid},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -298,6 +299,22 @@ check_write_permitted(Resource, #ch{ username = Username}) ->
check_read_permitted(Resource, #ch{ username = Username}) ->
check_resource_access(Username, Resource, read).
+with_exclusive_access_or_die(QName, ReaderPid, F) ->
+ case rabbit_amqqueue:with_or_die(
+ QName, fun (Q = #amqqueue{exclusive_owner = Owner})
+ when Owner =:= none orelse Owner =:= ReaderPid ->
+ F(Q);
+ (_) ->
+ {error, wrong_exclusive_owner}
+ end) of
+ {error, wrong_exclusive_owner} ->
+ rabbit_misc:protocol_error(
+ resource_locked, "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QName)]);
+ Other ->
+ Other
+ end.
+
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
not_allowed, "no previously declared queue", []);
@@ -483,11 +500,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
%% In order to ensure that the consume_ok gets sent before
%% any messages are sent to the consumer, we get the queue
%% process to send the consume_ok on our behalf.
- case rabbit_amqqueue:with_or_die(
- QueueName,
+ case with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) ->
rabbit_amqqueue:basic_consume(
- Q, NoAck, ReaderPid, self(), LimiterPid,
+ Q, NoAck, self(), LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -497,14 +514,6 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
dict:store(ActualConsumerTag,
QueueName,
ConsumerMapping)}};
- {error, queue_owned_by_another_connection} ->
- %% The spec is silent on which exception to use
- %% here. This seems reasonable?
- %% FIXME: check this
-
- rabbit_misc:protocol_error(
- resource_locked, "~s owned by another connection",
- [rabbit_misc:rs(QueueName)]);
{error, exclusive_consume_unavailable} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
@@ -674,34 +683,40 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
-handle_method(#'queue.declare'{queue = QueueNameBin,
- passive = false,
- durable = Durable,
- exclusive = ExclusiveDeclare,
+handle_method(#'queue.declare'{queue = QueueNameBin,
+ passive = false,
+ durable = Durable,
+ exclusive = ExclusiveDeclare,
auto_delete = AutoDelete,
- nowait = NoWait,
- arguments = Args},
- _, State = #ch { virtual_host = VHostPath,
- reader_pid = ReaderPid }) ->
- %% FIXME: atomic create&claim
+ nowait = NoWait,
+ arguments = Args},
+ _, State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid,
+ queue_collector_pid = CollectorPid}) ->
+ Owner = case ExclusiveDeclare of
+ true -> ReaderPid;
+ false -> none
+ end,
+ %% We use this in both branches, because queue_declare may yet return an
+ %% existing queue.
Finish =
- fun (Q) ->
- if ExclusiveDeclare ->
- case rabbit_amqqueue:claim_queue(Q, ReaderPid) of
- locked ->
- %% AMQP 0-8 doesn't say which
- %% exception to use, so we mimic QPid
- %% here.
- rabbit_misc:protocol_error(
- resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(Q#amqqueue.name)]);
- ok -> ok
- end;
- true ->
- ok
+ fun (#amqqueue{name = QueueName, exclusive_owner = Owner1} = Q)
+ when Owner =:= Owner1 ->
+ check_configure_permitted(QueueName, State),
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as the
+ %% connection shuts down.
+ case Owner of
+ none -> ok;
+ _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
end,
- Q
+ Q;
+ (#amqqueue{name = QueueName}) ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QueueName)])
end,
Q = case rabbit_amqqueue:with(
rabbit_misc:r(VHostPath, queue, QueueNameBin),
@@ -713,34 +728,32 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- check_configure_permitted(QueueName, State),
- Finish(rabbit_amqqueue:declare(QueueName,
- Durable, AutoDelete, Args));
- Other = #amqqueue{name = QueueName} ->
- check_configure_permitted(QueueName, State),
+ Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
+ Args, Owner));
+ #amqqueue{} = Other ->
Other
end,
return_queue_declare_ok(State, NoWait, Q);
-handle_method(#'queue.declare'{queue = QueueNameBin,
+handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
- nowait = NoWait},
- _, State = #ch{ virtual_host = VHostPath }) ->
+ nowait = NoWait},
+ _, State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end),
+ Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun (Q) -> Q end),
return_queue_declare_ok(State, NoWait, Q);
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty,
- nowait = NoWait
- },
- _, State) ->
+ nowait = NoWait},
+ _, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
- case rabbit_amqqueue:with_or_die(
- QueueName,
+ case with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
rabbit_misc:protocol_error(
@@ -750,8 +763,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]);
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
- #'queue.delete_ok'{
- message_count = PurgedMessageCount})
+ #'queue.delete_ok'{message_count = PurgedMessageCount})
end;
handle_method(#'queue.bind'{queue = QueueNameBin,
@@ -773,11 +785,11 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
- _, State) ->
+ _, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die(
- QueueName,
+ {ok, PurgedMessageCount} = with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:purge(Q) end),
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index b4fd91560f..a7ca20c80b 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -43,7 +43,7 @@
-include("rabbit.hrl").
--record(iv_state, { queue, qname, len, pending_ack }).
+-record(iv_state, { queue, qname, durable, len, pending_ack }).
-record(tx, { pending_messages, pending_acks, is_persistent }).
-ifdef(use_specs).
@@ -66,18 +66,23 @@ init(QName, IsDurable, Recover) ->
true -> rabbit_persister:queue_content(QName);
false -> []
end),
- #iv_state { queue = Q, qname = QName, len = queue:len(Q),
+ #iv_state { queue = Q,
+ qname = QName,
+ durable = IsDurable,
+ len = queue:len(Q),
pending_ack = dict:new() }.
terminate(State) ->
State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }.
-delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA }) ->
- ok = persist_acks(none, QName, dict:fetch_keys(PA), PA),
+delete_and_terminate(State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
+ ok = persist_acks(QName, IsDurable, none, dict:fetch_keys(PA), PA),
{_PLen, State1} = purge(State),
terminate(State1).
-purge(State = #iv_state { len = Len, queue = Q, qname = QName }) ->
+purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
+ len = Len }) ->
%% We do not purge messages pending acks.
{AckTags, PA} =
rabbit_misc:queue_fold(
@@ -85,57 +90,63 @@ purge(State = #iv_state { len = Len, queue = Q, qname = QName }) ->
Acc;
({Msg = #basic_message { guid = Guid }, IsDelivered},
{AckTagsN, PAN}) ->
- ok = persist_delivery(QName, Msg, IsDelivered),
+ ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
{[Guid | AckTagsN], dict:store(Guid, Msg, PAN)}
end, {[], dict:new()}, Q),
- ok = persist_acks(none, QName, AckTags, PA),
+ ok = persist_acks(QName, IsDurable, none, AckTags, PA),
{Len, State #iv_state { len = 0, queue = queue:new() }}.
-publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len }) ->
- ok = persist_message(none, QName, Msg),
+publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
+ len = Len }) ->
+ ok = persist_message(QName, IsDurable, none, Msg),
State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }.
publish_delivered(false, _Msg, State) ->
{blank_ack, State};
publish_delivered(true, Msg = #basic_message { guid = Guid },
- State = #iv_state { qname = QName, len = 0,
- pending_ack = PA }) ->
- ok = persist_message(none, QName, Msg),
- ok = persist_delivery(QName, Msg, false),
+ State = #iv_state { qname = QName, durable = IsDurable,
+ len = 0, pending_ack = PA }) ->
+ ok = persist_message(QName, IsDurable, none, Msg),
+ ok = persist_delivery(QName, IsDurable, false, Msg),
{Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}.
fetch(_AckRequired, State = #iv_state { len = 0 }) ->
{empty, State};
-fetch(AckRequired, State = #iv_state { queue = Q, qname = QName, len = Len,
+fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName,
+ durable = IsDurable,
pending_ack = PA }) ->
{{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} =
queue:out(Q),
Len1 = Len - 1,
- ok = persist_delivery(QName, Msg, IsDelivered),
+ ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
PA1 = dict:store(Guid, Msg, PA),
{AckTag, PA2} = case AckRequired of
true -> {Guid, PA1};
- false -> ok = persist_acks(none, QName, [Guid], PA1),
+ false -> ok = persist_acks(QName, IsDurable, none,
+ [Guid], PA1),
{blank_ack, PA}
end,
{{Msg, IsDelivered, AckTag, Len1},
State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}.
-ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA }) ->
- ok = persist_acks(none, QName, AckTags, PA),
+ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
+ ok = persist_acks(QName, IsDurable, none, AckTags, PA),
PA1 = remove_acks(AckTags, PA),
State #iv_state { pending_ack = PA1 }.
-tx_publish(Txn, Msg, State = #iv_state { qname = QName }) ->
+tx_publish(Txn, Msg, State = #iv_state { qname = QName,
+ durable = IsDurable }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
- ok = persist_message(Txn, QName, Msg),
+ ok = persist_message(QName, IsDurable, Txn, Msg),
State.
-tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA }) ->
+tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }),
- ok = persist_acks(Txn, QName, AckTags, PA),
+ ok = persist_acks(QName, IsDurable, Txn, AckTags, PA),
State.
tx_rollback(Txn, State = #iv_state { qname = QName }) ->
@@ -228,32 +239,33 @@ do_if_persistent(F, Txn, QName) ->
%%----------------------------------------------------------------------------
-persist_message(_Txn, _QName, #basic_message { is_persistent = false }) ->
- ok;
-persist_message(Txn, QName, Msg) ->
+persist_message(QName, true, Txn, Msg = #basic_message {
+ is_persistent = true }) ->
Msg1 = Msg #basic_message {
%% don't persist any recoverable decoded properties,
%% rebuild from properties_bin on restore
content = rabbit_binary_parser:clear_decoded_content(
Msg #basic_message.content)},
persist_work(Txn, QName,
- [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]).
+ [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]);
+persist_message(_QName, _IsDurable, _Txn, _Msg) ->
+ ok.
-persist_delivery(_QName, #basic_message { is_persistent = false },
- _IsDelivered) ->
- ok;
-persist_delivery(_QName, _Message, true) ->
- ok;
-persist_delivery(QName, #basic_message { guid = Guid }, _IsDelivered) ->
- persist_work(none, QName, [{deliver, {QName, Guid}}]).
+persist_delivery(QName, true, false, #basic_message { is_persistent = true,
+ guid = Guid }) ->
+ persist_work(none, QName, [{deliver, {QName, Guid}}]);
+persist_delivery(_QName, _IsDurable, _IsDelivered, _Msg) ->
+ ok.
-persist_acks(Txn, QName, AckTags, PA) ->
+persist_acks(QName, true, Txn, AckTags, PA) ->
persist_work(Txn, QName,
[{ack, {QName, Guid}} || Guid <- AckTags,
begin
{ok, Msg} = dict:find(Guid, PA),
Msg #basic_message.is_persistent
- end]).
+ end]);
+persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) ->
+ ok.
persist_work(_Txn,_QName, []) ->
ok;
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 723b818b41..9a911ab15d 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -537,18 +537,24 @@ pid_to_string(Pid) when is_pid(Pid) ->
%% inverse of above
string_to_pid(Str) ->
+ Err = {error, {invalid_pid_syntax, Str}},
%% The \ before the trailing $ is only there to keep emacs
%% font-lock from getting confused.
case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$",
[{capture,all_but_first,list}]) of
{match, [NodeStr, IdStr, SerStr]} ->
- %% turn the triple into a pid - see pid_to_string
- <<131,NodeEnc/binary>> = term_to_binary(list_to_atom(NodeStr)),
+ %% the NodeStr atom might be quoted, so we have to parse
+ %% it rather than doing a simple list_to_atom
+ NodeAtom = case erl_scan:string(NodeStr) of
+ {ok, [{atom, _, X}], _} -> X;
+ {error, _, _} -> throw(Err)
+ end,
+ <<131,NodeEnc/binary>> = term_to_binary(NodeAtom),
Id = list_to_integer(IdStr),
Ser = list_to_integer(SerStr),
binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>);
nomatch ->
- throw({error, {invalid_pid_syntax, Str}})
+ throw(Err)
end.
version_compare(A, B, lte) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index c6bd2973e7..73a58f1328 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -52,11 +52,12 @@
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
--define(SLEEP_BEFORE_SILENT_CLOSE, 3000).
+-define(SILENT_CLOSE_DELAY, 3).
%---------------------------------------------------------------------------
--record(v1, {sock, connection, callback, recv_ref, connection_state}).
+-record(v1, {sock, connection, callback, recv_ref, connection_state,
+ queue_collector}).
-define(INFO_KEYS,
[pid, address, port, peer_address, peer_port,
@@ -234,6 +235,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
ProfilingValue = setup_profiling(),
+ {ok, Collector} = rabbit_reader_queue_collector:start_link(),
try
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
@@ -245,7 +247,8 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
client_properties = none},
callback = uninitialized_callback,
recv_ref = none,
- connection_state = pre_init},
+ connection_state = pre_init,
+ queue_collector = Collector},
handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
@@ -263,7 +266,9 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%% output to be sent, which results in unnecessary delays.
%%
%% gen_tcp:close(ClientSock),
- teardown_profiling(ProfilingValue)
+ teardown_profiling(ProfilingValue),
+ rabbit_reader_queue_collector:shutdown(Collector),
+ rabbit_misc:unlink_and_capture_exit(Collector)
end,
done.
@@ -426,11 +431,17 @@ wait_for_channel_termination(N, TimerRef) ->
exit(channel_termination_timeout)
end.
-maybe_close(State = #v1{connection_state = closing}) ->
+maybe_close(State = #v1{connection_state = closing,
+ queue_collector = Collector}) ->
case all_channels() of
- [] -> ok = send_on_channel0(
- State#v1.sock, #'connection.close_ok'{}),
- close_connection(State);
+ [] ->
+ %% Spec says "Exclusive queues may only be accessed by the current
+ %% connection, and are deleted when that connection closes."
+ %% This does not strictly imply synchrony, but in practice it seems
+ %% to be what people assume.
+ rabbit_reader_queue_collector:delete_all(Collector),
+ ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ close_connection(State);
_ -> State
end;
maybe_close(State) ->
@@ -579,7 +590,7 @@ handle_method0(MethodName, FieldsBin, State) ->
%% We don't trust the client at this point - force
%% them to wait for a bit so they can't DOS us with
%% repeated failed logins etc.
- Other -> timer:sleep(?SLEEP_BEFORE_SILENT_CLOSE),
+ Other -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
throw({channel0_error, Other, CompleteReason})
end
end.
@@ -727,15 +738,16 @@ i(Item, #v1{}) ->
%%--------------------------------------------------------------------------
-send_to_new_channel(Channel, AnalyzedFrame, State) ->
+send_to_new_channel(Channel, AnalyzedFrame,
+ State = #v1{queue_collector = Collector}) ->
#v1{sock = Sock, connection = #connection{
frame_max = FrameMax,
user = #user{username = Username},
vhost = VHost}} = State,
WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/5,
- [Channel, self(), WriterPid, Username, VHost]),
+ fun rabbit_channel:start_link/6,
+ [Channel, self(), WriterPid, Username, VHost, Collector]),
put({channel, Channel}, {chpid, ChPid}),
put({chpid, ChPid}, {channel, Channel}),
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
diff --git a/src/rabbit_reader_queue_collector.erl b/src/rabbit_reader_queue_collector.erl
new file mode 100644
index 0000000000..841549e98f
--- /dev/null
+++ b/src/rabbit_reader_queue_collector.erl
@@ -0,0 +1,108 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_reader_queue_collector).
+
+-behaviour(gen_server).
+
+-export([start_link/0, register_exclusive_queue/2, delete_all/1, shutdown/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {exclusive_queues}).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()}).
+-spec(register_exclusive_queue/2 :: (pid(), amqqueue()) -> 'ok').
+-spec(delete_all/1 :: (pid()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link(?MODULE, [], []).
+
+register_exclusive_queue(CollectorPid, Q) ->
+ gen_server:call(CollectorPid, {register_exclusive_queue, Q}, infinity).
+
+delete_all(CollectorPid) ->
+ gen_server:call(CollectorPid, delete_all, infinity).
+
+shutdown(CollectorPid) ->
+ gen_server:call(CollectorPid, shutdown, infinity).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, #state{exclusive_queues = dict:new()}}.
+
+%%--------------------------------------------------------------------------
+
+handle_call({register_exclusive_queue, Q}, _From,
+ State = #state{exclusive_queues = Queues}) ->
+ MonitorRef = erlang:monitor(process, Q#amqqueue.pid),
+ {reply, ok,
+ State#state{exclusive_queues = dict:store(MonitorRef, Q, Queues)}};
+
+handle_call(delete_all, _From,
+ State = #state{exclusive_queues = ExclusiveQueues}) ->
+ [rabbit_misc:with_exit_handler(
+ fun() -> ok end,
+ fun() ->
+ erlang:demonitor(MonitorRef),
+ rabbit_amqqueue:delete(Q, false, false)
+ end)
+ || {MonitorRef, Q} <- dict:to_list(ExclusiveQueues)],
+ {reply, ok, State};
+
+handle_call(shutdown, _From, State) ->
+ {stop, normal, ok, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
+ State = #state{exclusive_queues = ExclusiveQueues}) ->
+ {noreply, State#state{exclusive_queues =
+ dict:erase(MonitorRef, ExclusiveQueues)}}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e1ee5d080c..04ee96d8a1 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -909,17 +909,16 @@ test_user_management() ->
passed.
test_server_status() ->
-
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
- Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>),
+ Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
+ self()),
[Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, Name),
- false, false, []) ||
+ false, false, [], none) ||
Name <- [<<"foo">>, <<"bar">>]],
- ok = rabbit_amqqueue:claim_queue(Q, self()),
- ok = rabbit_amqqueue:basic_consume(Q, true, self(), Ch, undefined,
+ ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined,
<<"ctag">>, true, undefined),
%% list queues
@@ -1020,10 +1019,11 @@ test_delegates_async(SecondaryNode) ->
passed.
-make_responder(FMsg) ->
+make_responder(FMsg) -> make_responder(FMsg, timeout).
+make_responder(FMsg, Throw) ->
fun() ->
receive Msg -> FMsg(Msg)
- after 1000 -> throw(timeout)
+ after 1000 -> throw(Throw)
end
end.
@@ -1057,17 +1057,21 @@ test_delegates_sync(SecondaryNode) ->
gen_server:reply(From, response)
end),
+ BadResponder = make_responder(fun({'$gen_call', From, invoked}) ->
+ gen_server:reply(From, response)
+ end, bad_responder_died),
+
response = delegate:invoke(spawn(Responder), Sender),
response = delegate:invoke(spawn(SecondaryNode, Responder), Sender),
- must_exit(fun() -> delegate:invoke(spawn(Responder), BadSender) end),
+ must_exit(fun() -> delegate:invoke(spawn(BadResponder), BadSender) end),
must_exit(fun() ->
- delegate:invoke(spawn(SecondaryNode, Responder), BadSender) end),
+ delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end),
LocalGoodPids = spawn_responders(node(), Responder, 2),
RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2),
- LocalBadPids = spawn_responders(node(), Responder, 2),
- RemoteBadPids = spawn_responders(SecondaryNode, Responder, 2),
+ LocalBadPids = spawn_responders(node(), BadResponder, 2),
+ RemoteBadPids = spawn_responders(SecondaryNode, BadResponder, 2),
{GoodRes, []} = delegate:invoke(LocalGoodPids ++ RemoteGoodPids, Sender),
true = lists:all(fun ({_, response}) -> true end, GoodRes),