summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-06-01 14:01:49 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-06-01 14:01:49 +0100
commit5b5301e8ba3b18eadd31e9dfecbd3a1e784d8710 (patch)
treec4d006378f57deb81480ed8e429725f4caaa5ae5 /src
parenta1d9c6d1faf7de49ab9f6d3681eef89d667cf742 (diff)
parente31bf3c052e4b8bfae5594e4da14d4185dbdf6a4 (diff)
downloadrabbitmq-server-git-5b5301e8ba3b18eadd31e9dfecbd3a1e784d8710.tar.gz
Merging in from default
Diffstat (limited to 'src')
-rw-r--r--src/delegate.erl55
-rw-r--r--src/gen_server2.erl2
-rw-r--r--src/rabbit_amqqueue.erl52
-rw-r--r--src/rabbit_amqqueue_process.erl190
-rw-r--r--src/rabbit_channel.erl136
-rw-r--r--src/rabbit_control.erl4
-rw-r--r--src/rabbit_exchange.erl13
-rw-r--r--src/rabbit_invariable_queue.erl82
-rw-r--r--src/rabbit_misc.erl18
-rw-r--r--src/rabbit_mnesia.erl2
-rw-r--r--src/rabbit_multi.erl4
-rw-r--r--src/rabbit_net.erl2
-rw-r--r--src/rabbit_persister.erl2
-rw-r--r--src/rabbit_reader.erl36
-rw-r--r--src/rabbit_reader_queue_collector.erl108
-rw-r--r--src/rabbit_router.erl11
-rw-r--r--src/rabbit_tests.erl82
-rw-r--r--src/supervisor2.erl8
18 files changed, 475 insertions, 332 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 12eb814f8f..8af2812781 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -45,8 +45,8 @@
-ifdef(use_specs).
-spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}).
--spec(invoke_no_result/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok').
--spec(invoke/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A).
+-spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
+-spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A).
-spec(process_count/0 :: () -> non_neg_integer()).
@@ -63,7 +63,7 @@ start_link(Hash) ->
gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []).
invoke(Pid, Fun) when is_pid(Pid) ->
- [Res] = invoke_per_node([{node(Pid), [Pid]}], Fun),
+ [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun),
case Res of
{ok, Result, _} ->
Result;
@@ -73,7 +73,7 @@ invoke(Pid, Fun) when is_pid(Pid) ->
invoke(Pids, Fun) when is_list(Pids) ->
lists:foldl(
- fun({Status, Result, Pid}, {Good, Bad}) ->
+ fun ({Status, Result, Pid}, {Good, Bad}) ->
case Status of
ok -> {[{Pid, Result}|Good], Bad};
error -> {Good, [{Pid, Result}|Bad]}
@@ -83,7 +83,7 @@ invoke(Pids, Fun) when is_list(Pids) ->
invoke_per_node(split_delegate_per_node(Pids), Fun)).
invoke_no_result(Pid, Fun) when is_pid(Pid) ->
- invoke_no_result_per_node([{node(Pid), [Pid]}], Fun),
+ invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun),
ok;
invoke_no_result(Pids, Fun) when is_list(Pids) ->
@@ -99,42 +99,47 @@ internal_cast(Node, Thunk) when is_atom(Node) ->
gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}).
split_delegate_per_node(Pids) ->
- orddict:to_list(
- lists:foldl(
- fun (Pid, D) ->
- orddict:update(node(Pid),
- fun (Pids1) -> [Pid | Pids1] end,
- [Pid], D)
- end,
- orddict:new(), Pids)).
+ LocalNode = node(),
+ {Local, Remote} =
+ lists:foldl(
+ fun (Pid, {L, D}) ->
+ Node = node(Pid),
+ case Node of
+ LocalNode -> {[Pid|L], D};
+ _ -> {L, orddict:append(Node, Pid, D)}
+ end
+ end,
+ {[], orddict:new()}, Pids),
+ {Local, orddict:to_list(Remote)}.
-invoke_per_node([{Node, Pids}], Fun) when Node == node() ->
- safe_invoke(Pids, Fun);
invoke_per_node(NodePids, Fun) ->
lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)).
-invoke_no_result_per_node([{Node, Pids}], Fun) when Node == node() ->
- %% This is not actually async! However, in practice Fun will
- %% always be something that does a gen_server:cast or similar, so
- %% I don't think it's a problem unless someone misuses this
- %% function. Making this *actually* async would be painful as we
- %% can't spawn at this point or we break effect ordering.
- safe_invoke(Pids, Fun);
invoke_no_result_per_node(NodePids, Fun) ->
delegate_per_node(NodePids, Fun, fun internal_cast/2),
ok.
-delegate_per_node(NodePids, Fun, DelegateFun) ->
+delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) ->
+ %% In the case where DelegateFun is internal_cast, the safe_invoke
+ %% is not actually async! However, in practice Fun will always be
+ %% something that does a gen_server:cast or similar, so I don't
+ %% think it's a problem unless someone misuses this
+ %% function. Making this *actually* async would be painful as we
+ %% can't spawn at this point or we break effect ordering.
+ [safe_invoke(LocalPids, Fun)|
+ delegate_per_remote_node(NodePids, Fun, DelegateFun)].
+
+delegate_per_remote_node(NodePids, Fun, DelegateFun) ->
Self = self(),
%% Note that this is unsafe if the Fun requires reentrancy to the
%% local_server. I.e. if self() == local_server(Node) then we'll
%% block forever.
[gen_server2:cast(
local_server(Node),
- {thunk, fun() ->
+ {thunk, fun () ->
Self ! {result,
DelegateFun(
- Node, fun() -> safe_invoke(Pids, Fun) end)}
+ Node, fun () -> safe_invoke(Pids, Fun) end)}
end}) || {Node, Pids} <- NodePids],
[receive {result, Result} -> Result end || _ <- NodePids].
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 5b899cdbc7..547f0a42e2 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -639,7 +639,7 @@ do_multi_call(Nodes, Name, Req, Timeout) ->
Caller = self(),
Receiver =
spawn(
- fun() ->
+ fun () ->
%% Middleman process. Should be unsensitive to regular
%% exit signals. The sychronization is needed in case
%% the receiver would exit before the caller started
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7b88c45d26..1756640a90 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-module(rabbit_amqqueue).
--export([start/0, declare/4, delete/3, purge/1]).
+-export([start/0, declare/5, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
update_ram_duration/1, set_ram_duration_target/2,
@@ -41,8 +41,7 @@
stat/1, stat_all/0, deliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1]).
--export([claim_queue/2]).
--export([basic_get/3, basic_consume/8, basic_cancel/4]).
+-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -66,8 +65,8 @@
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-spec(start/0 :: () -> 'ok').
--spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) ->
- amqqueue()).
+-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(),
+ maybe(pid())) -> amqqueue()).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
@@ -97,14 +96,12 @@
-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> 'ok').
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
--spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
--spec(basic_consume/8 ::
- (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(),
+-spec(basic_consume/7 ::
+ (amqqueue(), boolean(), pid(), pid() | 'undefined', ctag(),
boolean(), any()) ->
- 'ok' | {'error', 'queue_owned_by_another_connection' |
- 'exclusive_consume_unavailable'}).
+ 'ok' | {'error', 'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
@@ -148,11 +145,12 @@ recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
[Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q].
-declare(QueueName, Durable, AutoDelete, Args) ->
+declare(QueueName, Durable, AutoDelete, Args, Owner) ->
Q = start_queue_process(#amqqueue{name = QueueName,
durable = Durable,
auto_delete = AutoDelete,
arguments = Args,
+ exclusive_owner = Owner,
pid = none}),
case gen_server2:call(Q#amqqueue.pid, {init, false}) of
not_found -> rabbit_misc:not_found(QueueName);
@@ -298,15 +296,12 @@ limit_all(QPids, ChPid, LimiterPid) ->
gen_server2:cast(QPid, {limit, ChPid, LimiterPid})
end).
-claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
- delegate_call(QPid, {claim_queue, ReaderPid}, infinity).
-
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- delegate_call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
+ delegate_call(QPid, {basic_consume, NoAck, ChPid,
LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
infinity).
@@ -324,19 +319,21 @@ flush_all(QPids, ChPid) ->
delegate:invoke_no_result(
QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end).
+internal_delete1(QueueName) ->
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ ok = mnesia:delete({rabbit_durable_queue, QueueName}),
+ %% we want to execute some things, as
+ %% decided by rabbit_exchange, after the
+ %% transaction.
+ rabbit_exchange:delete_queue_bindings(QueueName).
+
internal_delete(QueueName) ->
case
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [_] ->
- ok = mnesia:delete({rabbit_queue, QueueName}),
- ok = mnesia:delete({rabbit_durable_queue, QueueName}),
- %% we want to execute some things, as
- %% decided by rabbit_exchange, after the
- %% transaction.
- rabbit_exchange:delete_queue_bindings(QueueName)
+ [_] -> internal_delete1(QueueName)
end
end) of
Err = {error, _} -> Err;
@@ -394,15 +391,16 @@ safe_delegate_call_ok(H, F, Pids) ->
end.
delegate_call(Pid, Msg, Timeout) ->
- delegate:invoke(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end).
+ delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
delegate_pcall(Pid, Pri, Msg, Timeout) ->
- delegate:invoke(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
+ delegate:invoke(Pid,
+ fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
delegate_cast(Pid, Msg) ->
- delegate:invoke_no_result(Pid, fun(P) -> gen_server2:cast(P, Msg) end).
+ delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
delegate_pcast(Pid, Pri, Msg) ->
delegate:invoke_no_result(Pid,
- fun(P) -> gen_server2:pcast(P, Pri, Msg) end).
+ fun (P) -> gen_server2:pcast(P, Pri, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f12e1b70f8..3283cb6679 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -50,7 +50,6 @@
% Queue's state
-record(q, {q,
- owner,
exclusive_consumer,
has_had_consumers,
backing_queue,
@@ -104,7 +103,6 @@ init(Q) ->
{ok, BQ} = application:get_env(backing_queue_module),
{ok, #q{q = Q#amqqueue{pid = self()},
- owner = none,
exclusive_consumer = none,
has_had_consumers = false,
backing_queue = BQ,
@@ -134,6 +132,23 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+declare(Recover, From,
+ State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
+ backing_queue = BQ, backing_queue_state = undefined}) ->
+ case rabbit_amqqueue:internal_declare(Q, Recover) of
+ not_found -> {stop, normal, not_found, State};
+ Q -> gen_server2:reply(From, Q),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use,
+ [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue,
+ set_ram_duration_target, [self()]}),
+ BQS = BQ:init(QName, IsDurable, Recover),
+ noreply(State#q{backing_queue_state = BQS});
+ Q1 -> {stop, normal, Q1, State}
+ end.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
@@ -433,10 +448,6 @@ cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) ->
cancel_holder(_ChPid, _ConsumerTag, Holder) ->
Holder.
-check_queue_owner(none, _) -> ok;
-check_queue_owner({ReaderPid, _}, ReaderPid) -> ok;
-check_queue_owner({_, _}, _) -> mismatch.
-
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
check_exclusive_access(none, false, _State) ->
@@ -488,10 +499,10 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
i(pid, _) ->
self();
-i(owner_pid, #q{owner = none}) ->
+i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) ->
'';
-i(owner_pid, #q{owner = {ReaderPid, _MonitorRef}}) ->
- ReaderPid;
+i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) ->
+ ExclusiveOwner;
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
'';
i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
@@ -520,25 +531,24 @@ i(Item, _) ->
%---------------------------------------------------------------------------
handle_call({init, Recover}, From,
- State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined}) ->
- %% TODO: If we're exclusively owned && our owner isn't alive &&
- %% Recover then we should BQ:init and then {stop, normal,
- %% not_found, State}, relying on terminate to delete the queue.
- case rabbit_amqqueue:internal_declare(Q, Recover) of
- not_found ->
- {stop, normal, not_found, State};
- Q ->
- gen_server2:reply(From, Q),
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use, [self()]),
- ok = rabbit_memory_monitor:register(
- self(),
- {rabbit_amqqueue, set_ram_duration_target, [self()]}),
- noreply(State#q{backing_queue_state =
- BQ:init(QName, IsDurable, Recover)});
- Q1 ->
- {stop, normal, Q1, State}
+ State = #q{q = #amqqueue{exclusive_owner = none}}) ->
+ declare(Recover, From, State);
+
+handle_call({init, Recover}, From,
+ State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
+ case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of
+ true -> erlang:monitor(process, Owner),
+ declare(Recover, From, State);
+ _ -> #q{q = #amqqueue{name = QName, durable = IsDurable},
+ backing_queue = BQ, backing_queue_state = undefined} = State,
+ case Recover of
+ true -> ok;
+ _ -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n", [QName])
+ end,
+ BQS = BQ:init(QName, IsDurable, Recover),
+ %% Rely on terminate to delete the queue.
+ {stop, normal, not_found, State#q{backing_queue_state = BQS}}
end;
handle_call(info, _From, State) ->
@@ -613,51 +623,44 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply({ok, Remaining, Msg}, State#q{backing_queue_state = BQS1})
end;
-handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
+handle_call({basic_consume, NoAck, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg},
- _From, State = #q{owner = Owner,
- exclusive_consumer = ExistingHolder}) ->
- case check_queue_owner(Owner, ReaderPid) of
- mismatch ->
- reply({error, queue_owned_by_another_connection}, State);
+ _From, State = #q{exclusive_consumer = ExistingHolder}) ->
+ case check_exclusive_access(ExistingHolder, ExclusiveConsume,
+ State) of
+ in_use ->
+ reply({error, exclusive_consume_unavailable}, State);
ok ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume,
- State) of
- in_use ->
- reply({error, exclusive_consume_unavailable}, State);
- ok ->
- C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
- Consumer = #consumer{tag = ConsumerTag,
- ack_required = not NoAck},
- store_ch_record(C#cr{consumer_count = ConsumerCount +1,
- limiter_pid = LimiterPid}),
- case ConsumerCount of
- 0 -> ok = rabbit_limiter:register(LimiterPid, self());
- _ -> ok
- end,
- ExclusiveConsumer = case ExclusiveConsume of
- true -> {ChPid, ConsumerTag};
- false -> ExistingHolder
- end,
- State1 = State#q{has_had_consumers = true,
- exclusive_consumer = ExclusiveConsumer},
- ok = maybe_send_reply(ChPid, OkMsg),
- State2 =
- case is_ch_blocked(C) of
- true -> State1#q{
- blocked_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.blocked_consumers)};
- false -> run_message_queue(
- State1#q{
- active_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.active_consumers)})
- end,
- reply(ok, State2)
- end
+ C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
+ Consumer = #consumer{tag = ConsumerTag,
+ ack_required = not NoAck},
+ store_ch_record(C#cr{consumer_count = ConsumerCount +1,
+ limiter_pid = LimiterPid}),
+ ok = case ConsumerCount of
+ 0 -> rabbit_limiter:register(LimiterPid, self());
+ _ -> ok
+ end,
+ ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> ExistingHolder
+ end,
+ State1 = State#q{has_had_consumers = true,
+ exclusive_consumer = ExclusiveConsumer},
+ ok = maybe_send_reply(ChPid, OkMsg),
+ State2 =
+ case is_ch_blocked(C) of
+ true -> State1#q{
+ blocked_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.blocked_consumers)};
+ false -> run_message_queue(
+ State1#q{
+ active_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.active_consumers)})
+ end,
+ reply(ok, State2)
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
@@ -713,29 +716,6 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
{Count, BQS1} = BQ:purge(BQS),
reply({ok, Count}, State#q{backing_queue_state = BQS1});
-handle_call({claim_queue, ReaderPid}, _From,
- State = #q{owner = Owner, exclusive_consumer = Holder}) ->
- case Owner of
- none ->
- case check_exclusive_access(Holder, true, State) of
- in_use ->
- %% FIXME: Is this really the right answer? What if
- %% an active consumer's reader is actually the
- %% claiming pid? Should that be allowed? In order
- %% to check, we'd need to hold not just the ch
- %% pid for each consumer, but also its reader
- %% pid...
- reply(locked, State);
- ok ->
- MonitorRef = erlang:monitor(process, ReaderPid),
- reply(ok, State#q{owner = {ReaderPid, MonitorRef}})
- end;
- {ReaderPid, _MonitorRef} ->
- reply(ok, State);
- _ ->
- reply(locked, State)
- end;
-
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
@@ -825,19 +805,15 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State).
-handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
- State = #q{owner = {DownPid, MonitorRef}}) ->
- %% We know here that there are no consumers on this queue that are
- %% owned by other pids than the one that just went down, so since
- %% exclusive in some sense implies autodelete, we delete the queue
- %% here. The other way of implementing the "exclusive implies
- %% autodelete" feature is to actually set autodelete when an
- %% exclusive declaration is seen, but this has the problem that
- %% the python tests rely on the queue not going away after a
- %% basic.cancel when the queue was declared exclusive and
- %% nonautodelete.
- NewState = State#q{owner = none},
- {stop, normal, NewState};
+handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
+ State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
+ %% Exclusively owned queues must disappear with their owner. In
+ %% the case of clean shutdown we delete the queue synchronously in
+ %% the reader - although not required by the spec this seems to
+ %% match what people expect (see bug 21824). However we need this
+ %% monitor-and-async- delete in case the connection goes away
+ %% unexpectedly.
+ {stop, normal, State};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
{ok, NewState} -> noreply(NewState);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0bc7fa09ba..d53711e8d9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,7 +35,7 @@
-behaviour(gen_server2).
--export([start_link/5, do/2, do/3, shutdown/1]).
+-export([start_link/6, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
@@ -48,7 +48,7 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, flow}).
+ consumer_mapping, blocking, queue_collector_pid, flow}).
-record(flow, {server, client, pending}).
@@ -73,8 +73,8 @@
-type(ref() :: any()).
--spec(start_link/5 ::
- (channel_number(), pid(), pid(), username(), vhost()) -> pid()).
+-spec(start_link/6 ::
+ (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()).
-spec(do/2 :: (pid(), amqp_method()) -> 'ok').
-spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
@@ -94,10 +94,10 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, Username, VHost) ->
+start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) ->
{ok, Pid} = gen_server2:start_link(
?MODULE, [Channel, ReaderPid, WriterPid,
- Username, VHost], []),
+ Username, VHost, CollectorPid], []),
Pid.
do(Pid, Method) ->
@@ -146,7 +146,7 @@ info_all(Items) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
+init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
process_flag(trap_exit, true),
link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
@@ -165,6 +165,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
blocking = dict:new(),
+ queue_collector_pid = CollectorPid,
flow = #flow{server = true, client = true,
pending = none}},
hibernate,
@@ -321,6 +322,22 @@ check_write_permitted(Resource, #ch{ username = Username}) ->
check_read_permitted(Resource, #ch{ username = Username}) ->
check_resource_access(Username, Resource, read).
+with_exclusive_access_or_die(QName, ReaderPid, F) ->
+ case rabbit_amqqueue:with_or_die(
+ QName, fun (Q = #amqqueue{exclusive_owner = Owner})
+ when Owner =:= none orelse Owner =:= ReaderPid ->
+ F(Q);
+ (_) ->
+ {error, wrong_exclusive_owner}
+ end) of
+ {error, wrong_exclusive_owner} ->
+ rabbit_misc:protocol_error(
+ resource_locked, "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QName)]);
+ Other ->
+ Other
+ end.
+
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
not_allowed, "no previously declared queue", []);
@@ -510,11 +527,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
%% In order to ensure that the consume_ok gets sent before
%% any messages are sent to the consumer, we get the queue
%% process to send the consume_ok on our behalf.
- case rabbit_amqqueue:with_or_die(
- QueueName,
+ case with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) ->
rabbit_amqqueue:basic_consume(
- Q, NoAck, ReaderPid, self(), LimiterPid,
+ Q, NoAck, self(), LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -524,14 +541,6 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
dict:store(ActualConsumerTag,
QueueName,
ConsumerMapping)}};
- {error, queue_owned_by_another_connection} ->
- %% The spec is silent on which exception to use
- %% here. This seems reasonable?
- %% FIXME: check this
-
- rabbit_misc:protocol_error(
- resource_locked, "~s owned by another connection",
- [rabbit_misc:rs(QueueName)]);
{error, exclusive_consume_unavailable} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
@@ -701,34 +710,40 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
-handle_method(#'queue.declare'{queue = QueueNameBin,
- passive = false,
- durable = Durable,
- exclusive = ExclusiveDeclare,
+handle_method(#'queue.declare'{queue = QueueNameBin,
+ passive = false,
+ durable = Durable,
+ exclusive = ExclusiveDeclare,
auto_delete = AutoDelete,
- nowait = NoWait,
- arguments = Args},
- _, State = #ch { virtual_host = VHostPath,
- reader_pid = ReaderPid }) ->
- %% FIXME: atomic create&claim
+ nowait = NoWait,
+ arguments = Args},
+ _, State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid,
+ queue_collector_pid = CollectorPid}) ->
+ Owner = case ExclusiveDeclare of
+ true -> ReaderPid;
+ false -> none
+ end,
+ %% We use this in both branches, because queue_declare may yet return an
+ %% existing queue.
Finish =
- fun (Q) ->
- if ExclusiveDeclare ->
- case rabbit_amqqueue:claim_queue(Q, ReaderPid) of
- locked ->
- %% AMQP 0-8 doesn't say which
- %% exception to use, so we mimic QPid
- %% here.
- rabbit_misc:protocol_error(
- resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(Q#amqqueue.name)]);
- ok -> ok
- end;
- true ->
- ok
+ fun (#amqqueue{name = QueueName, exclusive_owner = Owner1} = Q)
+ when Owner =:= Owner1 ->
+ check_configure_permitted(QueueName, State),
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as the
+ %% connection shuts down.
+ case Owner of
+ none -> ok;
+ _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
end,
- Q
+ Q;
+ (#amqqueue{name = QueueName}) ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QueueName)])
end,
Q = case rabbit_amqqueue:with(
rabbit_misc:r(VHostPath, queue, QueueNameBin),
@@ -740,34 +755,32 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- check_configure_permitted(QueueName, State),
- Finish(rabbit_amqqueue:declare(QueueName,
- Durable, AutoDelete, Args));
- Other = #amqqueue{name = QueueName} ->
- check_configure_permitted(QueueName, State),
+ Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
+ Args, Owner));
+ #amqqueue{} = Other ->
Other
end,
return_queue_declare_ok(State, NoWait, Q);
-handle_method(#'queue.declare'{queue = QueueNameBin,
+handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
- nowait = NoWait},
- _, State = #ch{ virtual_host = VHostPath }) ->
+ nowait = NoWait},
+ _, State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end),
+ Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun (Q) -> Q end),
return_queue_declare_ok(State, NoWait, Q);
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty,
- nowait = NoWait
- },
- _, State) ->
+ nowait = NoWait},
+ _, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
- case rabbit_amqqueue:with_or_die(
- QueueName,
+ case with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
rabbit_misc:protocol_error(
@@ -777,8 +790,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]);
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
- #'queue.delete_ok'{
- message_count = PurgedMessageCount})
+ #'queue.delete_ok'{message_count = PurgedMessageCount})
end;
handle_method(#'queue.bind'{queue = QueueNameBin,
@@ -800,11 +812,11 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
- _, State) ->
+ _, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die(
- QueueName,
+ {ok, PurgedMessageCount} = with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:purge(Q) end),
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index d1834b3b73..323d4d2fd1 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -59,8 +59,8 @@ start() ->
parse_args(FullCommand, #params{quiet = false,
node = rabbit_misc:makenode(NodeStr)}),
Inform = case Quiet of
- true -> fun(_Format, _Args1) -> ok end;
- false -> fun(Format, Args1) ->
+ true -> fun (_Format, _Args1) -> ok end;
+ false -> fun (Format, Args1) ->
io:format(Format ++ " ...~n", Args1)
end
end,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 8f41392f83..835b1468ae 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -80,8 +80,9 @@
bind_res() | {'error', 'binding_not_found'}).
-spec(list_bindings/1 :: (vhost()) ->
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
--spec(delete_queue_bindings/1 :: (queue_name()) -> fun(() -> none())).
--spec(delete_transient_queue_bindings/1 :: (queue_name()) -> fun(() -> none())).
+-spec(delete_queue_bindings/1 :: (queue_name()) -> fun (() -> none())).
+-spec(delete_transient_queue_bindings/1 :: (queue_name()) ->
+ fun (() -> none())).
-spec(delete/2 :: (exchange_name(), boolean()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
@@ -97,12 +98,12 @@
recover() ->
Exs = rabbit_misc:table_fold(
- fun(Exchange, Acc) ->
+ fun (Exchange, Acc) ->
ok = mnesia:write(rabbit_exchange, Exchange, write),
[Exchange | Acc]
end, [], rabbit_durable_exchange),
Bs = rabbit_misc:table_fold(
- fun(Route = #route{binding = B}, Acc) ->
+ fun (Route = #route{binding = B}, Acc) ->
{_, ReverseRoute} = route_with_reverse(Route),
ok = mnesia:write(rabbit_route,
Route, write),
@@ -349,7 +350,7 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
call_with_exchange(Exchange, Fun) ->
rabbit_misc:execute_mnesia_transaction(
- fun() -> case mnesia:read({rabbit_exchange, Exchange}) of
+ fun () -> case mnesia:read({rabbit_exchange, Exchange}) of
[] -> {error, not_found};
[X] -> Fun(X)
end
@@ -357,7 +358,7 @@ call_with_exchange(Exchange, Fun) ->
call_with_exchange_and_queue(Exchange, Queue, Fun) ->
rabbit_misc:execute_mnesia_transaction(
- fun() -> case {mnesia:read({rabbit_exchange, Exchange}),
+ fun () -> case {mnesia:read({rabbit_exchange, Exchange}),
mnesia:read({rabbit_queue, Queue})} of
{[X], [Q]} -> Fun(X, Q);
{[ ], [_]} -> {error, exchange_not_found};
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index b4fd91560f..a7ca20c80b 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -43,7 +43,7 @@
-include("rabbit.hrl").
--record(iv_state, { queue, qname, len, pending_ack }).
+-record(iv_state, { queue, qname, durable, len, pending_ack }).
-record(tx, { pending_messages, pending_acks, is_persistent }).
-ifdef(use_specs).
@@ -66,18 +66,23 @@ init(QName, IsDurable, Recover) ->
true -> rabbit_persister:queue_content(QName);
false -> []
end),
- #iv_state { queue = Q, qname = QName, len = queue:len(Q),
+ #iv_state { queue = Q,
+ qname = QName,
+ durable = IsDurable,
+ len = queue:len(Q),
pending_ack = dict:new() }.
terminate(State) ->
State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }.
-delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA }) ->
- ok = persist_acks(none, QName, dict:fetch_keys(PA), PA),
+delete_and_terminate(State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
+ ok = persist_acks(QName, IsDurable, none, dict:fetch_keys(PA), PA),
{_PLen, State1} = purge(State),
terminate(State1).
-purge(State = #iv_state { len = Len, queue = Q, qname = QName }) ->
+purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
+ len = Len }) ->
%% We do not purge messages pending acks.
{AckTags, PA} =
rabbit_misc:queue_fold(
@@ -85,57 +90,63 @@ purge(State = #iv_state { len = Len, queue = Q, qname = QName }) ->
Acc;
({Msg = #basic_message { guid = Guid }, IsDelivered},
{AckTagsN, PAN}) ->
- ok = persist_delivery(QName, Msg, IsDelivered),
+ ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
{[Guid | AckTagsN], dict:store(Guid, Msg, PAN)}
end, {[], dict:new()}, Q),
- ok = persist_acks(none, QName, AckTags, PA),
+ ok = persist_acks(QName, IsDurable, none, AckTags, PA),
{Len, State #iv_state { len = 0, queue = queue:new() }}.
-publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len }) ->
- ok = persist_message(none, QName, Msg),
+publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
+ len = Len }) ->
+ ok = persist_message(QName, IsDurable, none, Msg),
State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }.
publish_delivered(false, _Msg, State) ->
{blank_ack, State};
publish_delivered(true, Msg = #basic_message { guid = Guid },
- State = #iv_state { qname = QName, len = 0,
- pending_ack = PA }) ->
- ok = persist_message(none, QName, Msg),
- ok = persist_delivery(QName, Msg, false),
+ State = #iv_state { qname = QName, durable = IsDurable,
+ len = 0, pending_ack = PA }) ->
+ ok = persist_message(QName, IsDurable, none, Msg),
+ ok = persist_delivery(QName, IsDurable, false, Msg),
{Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}.
fetch(_AckRequired, State = #iv_state { len = 0 }) ->
{empty, State};
-fetch(AckRequired, State = #iv_state { queue = Q, qname = QName, len = Len,
+fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName,
+ durable = IsDurable,
pending_ack = PA }) ->
{{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} =
queue:out(Q),
Len1 = Len - 1,
- ok = persist_delivery(QName, Msg, IsDelivered),
+ ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
PA1 = dict:store(Guid, Msg, PA),
{AckTag, PA2} = case AckRequired of
true -> {Guid, PA1};
- false -> ok = persist_acks(none, QName, [Guid], PA1),
+ false -> ok = persist_acks(QName, IsDurable, none,
+ [Guid], PA1),
{blank_ack, PA}
end,
{{Msg, IsDelivered, AckTag, Len1},
State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}.
-ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA }) ->
- ok = persist_acks(none, QName, AckTags, PA),
+ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
+ ok = persist_acks(QName, IsDurable, none, AckTags, PA),
PA1 = remove_acks(AckTags, PA),
State #iv_state { pending_ack = PA1 }.
-tx_publish(Txn, Msg, State = #iv_state { qname = QName }) ->
+tx_publish(Txn, Msg, State = #iv_state { qname = QName,
+ durable = IsDurable }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
- ok = persist_message(Txn, QName, Msg),
+ ok = persist_message(QName, IsDurable, Txn, Msg),
State.
-tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA }) ->
+tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }),
- ok = persist_acks(Txn, QName, AckTags, PA),
+ ok = persist_acks(QName, IsDurable, Txn, AckTags, PA),
State.
tx_rollback(Txn, State = #iv_state { qname = QName }) ->
@@ -228,32 +239,33 @@ do_if_persistent(F, Txn, QName) ->
%%----------------------------------------------------------------------------
-persist_message(_Txn, _QName, #basic_message { is_persistent = false }) ->
- ok;
-persist_message(Txn, QName, Msg) ->
+persist_message(QName, true, Txn, Msg = #basic_message {
+ is_persistent = true }) ->
Msg1 = Msg #basic_message {
%% don't persist any recoverable decoded properties,
%% rebuild from properties_bin on restore
content = rabbit_binary_parser:clear_decoded_content(
Msg #basic_message.content)},
persist_work(Txn, QName,
- [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]).
+ [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]);
+persist_message(_QName, _IsDurable, _Txn, _Msg) ->
+ ok.
-persist_delivery(_QName, #basic_message { is_persistent = false },
- _IsDelivered) ->
- ok;
-persist_delivery(_QName, _Message, true) ->
- ok;
-persist_delivery(QName, #basic_message { guid = Guid }, _IsDelivered) ->
- persist_work(none, QName, [{deliver, {QName, Guid}}]).
+persist_delivery(QName, true, false, #basic_message { is_persistent = true,
+ guid = Guid }) ->
+ persist_work(none, QName, [{deliver, {QName, Guid}}]);
+persist_delivery(_QName, _IsDurable, _IsDelivered, _Msg) ->
+ ok.
-persist_acks(Txn, QName, AckTags, PA) ->
+persist_acks(QName, true, Txn, AckTags, PA) ->
persist_work(Txn, QName,
[{ack, {QName, Guid}} || Guid <- AckTags,
begin
{ok, Msg} = dict:find(Guid, PA),
Msg #basic_message.is_persistent
- end]).
+ end]);
+persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) ->
+ ok.
persist_work(_Txn,_QName, []) ->
ok;
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 723b818b41..35739dcbdf 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -242,12 +242,12 @@ report_cover([Root]) when is_atom(Root) ->
report_cover(Root) ->
Dir = filename:join(Root, "cover"),
ok = filelib:ensure_dir(filename:join(Dir,"junk")),
- lists:foreach(fun(F) -> file:delete(F) end,
+ lists:foreach(fun (F) -> file:delete(F) end,
filelib:wildcard(filename:join(Dir, "*.html"))),
{ok, SummaryFile} = file:open(filename:join(Dir, "summary.txt"), [write]),
{CT, NCT} =
lists:foldl(
- fun(M,{CovTot, NotCovTot}) ->
+ fun (M,{CovTot, NotCovTot}) ->
{ok, {M, {Cov, NotCov}}} = cover:analyze(M, module),
ok = report_coverage_percentage(SummaryFile,
Cov, NotCov, M),
@@ -367,7 +367,7 @@ upmap(F, L) ->
Parent = self(),
Ref = make_ref(),
[receive {Ref, Result} -> Result end
- || _ <- [spawn(fun() -> Parent ! {Ref, F(X)} end) || X <- L]].
+ || _ <- [spawn(fun () -> Parent ! {Ref, F(X)} end) || X <- L]].
map_in_order(F, L) ->
lists:reverse(
@@ -537,18 +537,24 @@ pid_to_string(Pid) when is_pid(Pid) ->
%% inverse of above
string_to_pid(Str) ->
+ Err = {error, {invalid_pid_syntax, Str}},
%% The \ before the trailing $ is only there to keep emacs
%% font-lock from getting confused.
case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$",
[{capture,all_but_first,list}]) of
{match, [NodeStr, IdStr, SerStr]} ->
- %% turn the triple into a pid - see pid_to_string
- <<131,NodeEnc/binary>> = term_to_binary(list_to_atom(NodeStr)),
+ %% the NodeStr atom might be quoted, so we have to parse
+ %% it rather than doing a simple list_to_atom
+ NodeAtom = case erl_scan:string(NodeStr) of
+ {ok, [{atom, _, X}], _} -> X;
+ {error, _, _} -> throw(Err)
+ end,
+ <<131,NodeEnc/binary>> = term_to_binary(NodeAtom),
Id = list_to_integer(IdStr),
Ser = list_to_integer(SerStr),
binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>);
nomatch ->
- throw({error, {invalid_pid_syntax, Str}})
+ throw(Err)
end.
version_compare(A, B, lte) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 55a6761d2d..a0b7aa4e7f 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -346,7 +346,7 @@ table_has_copy_type(TabDef, DiscType) ->
create_local_table_copies(Type) ->
lists:foreach(
- fun({Tab, TabDef}) ->
+ fun ({Tab, TabDef}) ->
HasDiscCopies = table_has_copy_type(TabDef, disc_copies),
HasDiscOnlyCopies = table_has_copy_type(TabDef, disc_only_copies),
LocalTab = proplists:get_bool(local_content, TabDef),
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 336f74bf9a..5db1d77a32 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -111,7 +111,7 @@ action(start_all, [NodeCount], RpcTimeout) ->
action(status, [], RpcTimeout) ->
io:format("Status of all running nodes...~n", []),
call_all_nodes(
- fun({Node, Pid}) ->
+ fun ({Node, Pid}) ->
RabbitRunning =
case is_rabbit_running(Node, RpcTimeout) of
false -> not_running;
@@ -123,7 +123,7 @@ action(status, [], RpcTimeout) ->
action(stop_all, [], RpcTimeout) ->
io:format("Stopping all nodes...~n", []),
- call_all_nodes(fun({Node, Pid}) ->
+ call_all_nodes(fun ({Node, Pid}) ->
io:format("Stopping node ~p~n", [Node]),
rpc:call(Node, rabbit, stop_and_halt, []),
case kill_wait(Pid, RpcTimeout, false) of
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 406977b42a..975954fcd2 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -66,7 +66,7 @@ async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) ->
Pid = self(),
Ref = make_ref(),
- spawn(fun() -> Pid ! {inet_async, Sock, Ref,
+ spawn(fun () -> Pid ! {inet_async, Sock, Ref,
ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)}
end),
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 3cd42e4753..8d3c2dc082 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -236,7 +236,7 @@ log_work(CreateWorkUnit, MessageList,
snapshot = Snapshot = #psnapshot{messages = Messages}}) ->
Unit = CreateWorkUnit(
rabbit_misc:map_in_order(
- fun(M = {publish, Message, QK = {_QName, PKey}}) ->
+ fun (M = {publish, Message, QK = {_QName, PKey}}) ->
case ets:lookup(Messages, PKey) of
[_] -> {tied, QK};
[] -> ets:insert(Messages, {PKey, Message}),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index c6bd2973e7..73a58f1328 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -52,11 +52,12 @@
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
--define(SLEEP_BEFORE_SILENT_CLOSE, 3000).
+-define(SILENT_CLOSE_DELAY, 3).
%---------------------------------------------------------------------------
--record(v1, {sock, connection, callback, recv_ref, connection_state}).
+-record(v1, {sock, connection, callback, recv_ref, connection_state,
+ queue_collector}).
-define(INFO_KEYS,
[pid, address, port, peer_address, peer_port,
@@ -234,6 +235,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
ProfilingValue = setup_profiling(),
+ {ok, Collector} = rabbit_reader_queue_collector:start_link(),
try
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
@@ -245,7 +247,8 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
client_properties = none},
callback = uninitialized_callback,
recv_ref = none,
- connection_state = pre_init},
+ connection_state = pre_init,
+ queue_collector = Collector},
handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
@@ -263,7 +266,9 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%% output to be sent, which results in unnecessary delays.
%%
%% gen_tcp:close(ClientSock),
- teardown_profiling(ProfilingValue)
+ teardown_profiling(ProfilingValue),
+ rabbit_reader_queue_collector:shutdown(Collector),
+ rabbit_misc:unlink_and_capture_exit(Collector)
end,
done.
@@ -426,11 +431,17 @@ wait_for_channel_termination(N, TimerRef) ->
exit(channel_termination_timeout)
end.
-maybe_close(State = #v1{connection_state = closing}) ->
+maybe_close(State = #v1{connection_state = closing,
+ queue_collector = Collector}) ->
case all_channels() of
- [] -> ok = send_on_channel0(
- State#v1.sock, #'connection.close_ok'{}),
- close_connection(State);
+ [] ->
+ %% Spec says "Exclusive queues may only be accessed by the current
+ %% connection, and are deleted when that connection closes."
+ %% This does not strictly imply synchrony, but in practice it seems
+ %% to be what people assume.
+ rabbit_reader_queue_collector:delete_all(Collector),
+ ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ close_connection(State);
_ -> State
end;
maybe_close(State) ->
@@ -579,7 +590,7 @@ handle_method0(MethodName, FieldsBin, State) ->
%% We don't trust the client at this point - force
%% them to wait for a bit so they can't DOS us with
%% repeated failed logins etc.
- Other -> timer:sleep(?SLEEP_BEFORE_SILENT_CLOSE),
+ Other -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
throw({channel0_error, Other, CompleteReason})
end
end.
@@ -727,15 +738,16 @@ i(Item, #v1{}) ->
%%--------------------------------------------------------------------------
-send_to_new_channel(Channel, AnalyzedFrame, State) ->
+send_to_new_channel(Channel, AnalyzedFrame,
+ State = #v1{queue_collector = Collector}) ->
#v1{sock = Sock, connection = #connection{
frame_max = FrameMax,
user = #user{username = Username},
vhost = VHost}} = State,
WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/5,
- [Channel, self(), WriterPid, Username, VHost]),
+ fun rabbit_channel:start_link/6,
+ [Channel, self(), WriterPid, Username, VHost, Collector]),
put({channel, Channel}, {chpid, ChPid}),
put({chpid, ChPid}, {channel, Channel}),
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
diff --git a/src/rabbit_reader_queue_collector.erl b/src/rabbit_reader_queue_collector.erl
new file mode 100644
index 0000000000..8d4e8fdb42
--- /dev/null
+++ b/src/rabbit_reader_queue_collector.erl
@@ -0,0 +1,108 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_reader_queue_collector).
+
+-behaviour(gen_server).
+
+-export([start_link/0, register_exclusive_queue/2, delete_all/1, shutdown/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {exclusive_queues}).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()}).
+-spec(register_exclusive_queue/2 :: (pid(), amqqueue()) -> 'ok').
+-spec(delete_all/1 :: (pid()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link(?MODULE, [], []).
+
+register_exclusive_queue(CollectorPid, Q) ->
+ gen_server:call(CollectorPid, {register_exclusive_queue, Q}, infinity).
+
+delete_all(CollectorPid) ->
+ gen_server:call(CollectorPid, delete_all, infinity).
+
+shutdown(CollectorPid) ->
+ gen_server:call(CollectorPid, shutdown, infinity).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, #state{exclusive_queues = dict:new()}}.
+
+%%--------------------------------------------------------------------------
+
+handle_call({register_exclusive_queue, Q}, _From,
+ State = #state{exclusive_queues = Queues}) ->
+ MonitorRef = erlang:monitor(process, Q#amqqueue.pid),
+ {reply, ok,
+ State#state{exclusive_queues = dict:store(MonitorRef, Q, Queues)}};
+
+handle_call(delete_all, _From,
+ State = #state{exclusive_queues = ExclusiveQueues}) ->
+ [rabbit_misc:with_exit_handler(
+ fun () -> ok end,
+ fun () ->
+ erlang:demonitor(MonitorRef),
+ rabbit_amqqueue:delete(Q, false, false)
+ end)
+ || {MonitorRef, Q} <- dict:to_list(ExclusiveQueues)],
+ {reply, ok, State};
+
+handle_call(shutdown, _From, State) ->
+ {stop, normal, ok, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
+ State = #state{exclusive_queues = ExclusiveQueues}) ->
+ {noreply, State#state{exclusive_queues =
+ dict:erase(MonitorRef, ExclusiveQueues)}}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 03979d6c60..5cd15a9462 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -57,14 +57,17 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
%% is preserved. This scales much better than the non-immediate
%% case below.
delegate:invoke_no_result(
- QPids, fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
+ QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
{routed, QPids};
deliver(QPids, Delivery) ->
{Success, _} =
delegate:invoke(QPids,
- fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
- {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Success),
+ fun (Pid) ->
+ rabbit_amqqueue:deliver(Pid, Delivery)
+ end),
+ {Routed, Handled} =
+ lists:foldl(fun fold_deliveries/2, {false, []}, Success),
check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
{Routed, Handled}).
@@ -88,7 +91,7 @@ match_routing_key(Name, RoutingKey) ->
lookup_qpids(Queues) ->
sets:fold(
- fun(Key, Acc) ->
+ fun (Key, Acc) ->
case mnesia:dirty_read({rabbit_queue, Key}) of
[#amqqueue{pid = QPid}] -> [QPid | Acc];
[] -> Acc
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 357e9949a1..05efdcaca3 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -748,17 +748,16 @@ test_user_management() ->
passed.
test_server_status() ->
-
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
- Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>),
+ Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
+ self()),
[Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, Name),
- false, false, []) ||
+ false, false, [], none) ||
Name <- [<<"foo">>, <<"bar">>]],
- ok = rabbit_amqqueue:claim_queue(Q, self()),
- ok = rabbit_amqqueue:basic_consume(Q, true, self(), Ch, undefined,
+ ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined,
<<"ctag">>, true, undefined),
%% list queues
@@ -825,7 +824,7 @@ test_hooks() ->
{[arg1, arg2], 1, 3} = get(arg_hook_test_fired),
%% Invoking Pids
- Remote = fun() ->
+ Remote = fun () ->
receive
{rabbitmq_hook,[remote_test,test,[],Target]} ->
Target ! invoked
@@ -842,23 +841,6 @@ test_hooks() ->
end,
passed.
-test_delegates_async(SecondaryNode) ->
- Self = self(),
- Sender = fun(Pid) -> Pid ! {invoked, Self} end,
-
- Responder = make_responder(fun({invoked, Pid}) -> Pid ! response end),
-
- ok = delegate:invoke_no_result(spawn(Responder), Sender),
- ok = delegate:invoke_no_result(spawn(SecondaryNode, Responder), Sender),
- await_response(2),
-
- LocalPids = spawn_responders(node(), Responder, 10),
- RemotePids = spawn_responders(SecondaryNode, Responder, 10),
- ok = delegate:invoke_no_result(LocalPids ++ RemotePids, Sender),
- await_response(20),
-
- passed.
-
test_memory_pressure_receiver(Pid) ->
receive
shutdown ->
@@ -897,7 +879,8 @@ test_memory_pressure_sync(Ch, Writer) ->
test_memory_pressure_spawn() ->
Me = self(),
Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end),
- Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>),
+ Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
+ self()),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
MRef = erlang:monitor(process, Ch),
receive #'channel.open_ok'{} -> ok
@@ -930,7 +913,12 @@ test_memory_pressure() ->
ok = test_memory_pressure_receive_flow(true),
%% if we publish at this point, the channel should die
- ok = rabbit_channel:do(Ch0, #'basic.publish'{}, #content{}),
+ Content = #content{class_id = element(1, rabbit_framing:method_id(
+ 'basic.publish')),
+ properties = none,
+ properties_bin = <<>>,
+ payload_fragments_rev = []},
+ ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content),
receive {'DOWN', MRef0, process, Ch0, normal} ->
ok
after 1000 ->
@@ -971,10 +959,28 @@ test_memory_pressure() ->
passed.
-make_responder(FMsg) ->
- fun() ->
+test_delegates_async(SecondaryNode) ->
+ Self = self(),
+ Sender = fun (Pid) -> Pid ! {invoked, Self} end,
+
+ Responder = make_responder(fun ({invoked, Pid}) -> Pid ! response end),
+
+ ok = delegate:invoke_no_result(spawn(Responder), Sender),
+ ok = delegate:invoke_no_result(spawn(SecondaryNode, Responder), Sender),
+ await_response(2),
+
+ LocalPids = spawn_responders(node(), Responder, 10),
+ RemotePids = spawn_responders(SecondaryNode, Responder, 10),
+ ok = delegate:invoke_no_result(LocalPids ++ RemotePids, Sender),
+ await_response(20),
+
+ passed.
+
+make_responder(FMsg) -> make_responder(FMsg, timeout).
+make_responder(FMsg, Throw) ->
+ fun () ->
receive Msg -> FMsg(Msg)
- after 1000 -> throw(timeout)
+ after 1000 -> throw(Throw)
end
end.
@@ -1001,24 +1007,28 @@ must_exit(Fun) ->
end.
test_delegates_sync(SecondaryNode) ->
- Sender = fun(Pid) -> gen_server:call(Pid, invoked) end,
- BadSender = fun(_Pid) -> exit(exception) end,
+ Sender = fun (Pid) -> gen_server:call(Pid, invoked) end,
+ BadSender = fun (_Pid) -> exit(exception) end,
- Responder = make_responder(fun({'$gen_call', From, invoked}) ->
+ Responder = make_responder(fun ({'$gen_call', From, invoked}) ->
gen_server:reply(From, response)
end),
+ BadResponder = make_responder(fun ({'$gen_call', From, invoked}) ->
+ gen_server:reply(From, response)
+ end, bad_responder_died),
+
response = delegate:invoke(spawn(Responder), Sender),
response = delegate:invoke(spawn(SecondaryNode, Responder), Sender),
- must_exit(fun() -> delegate:invoke(spawn(Responder), BadSender) end),
- must_exit(fun() ->
- delegate:invoke(spawn(SecondaryNode, Responder), BadSender) end),
+ must_exit(fun () -> delegate:invoke(spawn(BadResponder), BadSender) end),
+ must_exit(fun () ->
+ delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end),
LocalGoodPids = spawn_responders(node(), Responder, 2),
RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2),
- LocalBadPids = spawn_responders(node(), Responder, 2),
- RemoteBadPids = spawn_responders(SecondaryNode, Responder, 2),
+ LocalBadPids = spawn_responders(node(), BadResponder, 2),
+ RemoteBadPids = spawn_responders(SecondaryNode, BadResponder, 2),
{GoodRes, []} = delegate:invoke(LocalGoodPids ++ RemoteGoodPids, Sender),
true = lists:all(fun ({_, response}) -> true end, GoodRes),
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 5575351269..0b1d726562 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -301,13 +301,13 @@ handle_call({terminate_child, Name}, _From, State) ->
handle_call(which_children, _From, State) when ?is_simple(State) ->
[#child{child_type = CT, modules = Mods}] = State#state.children,
- Reply = lists:map(fun({Pid, _}) -> {undefined, Pid, CT, Mods} end,
+ Reply = lists:map(fun ({Pid, _}) -> {undefined, Pid, CT, Mods} end,
?DICT:to_list(State#state.dynamics)),
{reply, Reply, State};
handle_call(which_children, _From, State) ->
Resp =
- lists:map(fun(#child{pid = Pid, name = Name,
+ lists:map(fun (#child{pid = Pid, name = Name,
child_type = ChildType, modules = Mods}) ->
{Name, Pid, ChildType, Mods}
end,
@@ -415,7 +415,7 @@ update_childspec1([], Children, KeepOld) ->
lists:reverse(Children ++ KeepOld).
update_chsp(OldCh, Children) ->
- case lists:map(fun(Ch) when OldCh#child.name =:= Ch#child.name ->
+ case lists:map(fun (Ch) when OldCh#child.name =:= Ch#child.name ->
Ch#child{pid = OldCh#child.pid};
(Ch) ->
Ch
@@ -828,7 +828,7 @@ validShutdown(Shutdown, _) -> throw({invalid_shutdown, Shutdown}).
validMods(dynamic) -> true;
validMods(Mods) when is_list(Mods) ->
- lists:foreach(fun(Mod) ->
+ lists:foreach(fun (Mod) ->
if
is_atom(Mod) -> ok;
true -> throw({invalid_module, Mod})