summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-09-28 13:48:32 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-09-28 13:48:32 +0100
commit74146c8906eb649608cb44cd1f9b816cece5d4b7 (patch)
tree3049c3037c25994bd32df2abc10084c3d8a7d639
parente53655cd779190dd9160e25c322f7354cc5e3b61 (diff)
parent43ff2077045189a7daf16cf06c048f195384aae9 (diff)
downloadrabbitmq-server-git-74146c8906eb649608cb44cd1f9b816cece5d4b7.tar.gz
Merging bug 23143 into default (drive-by fixes include correcting the spec for delete_exclusive, and one cosmetic)
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl17
-rw-r--r--src/rabbit_queue_collector.erl2
3 files changed, 24 insertions, 3 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3e677c3809..42bddc5e81 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-module(rabbit_amqqueue).
--export([start/0, stop/0, declare/5, delete/3, purge/1]).
+-export([start/0, stop/0, declare/5, delete_exclusive/1, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
update_ram_duration/1, set_ram_duration_target/2,
@@ -115,6 +115,9 @@
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(delete_exclusive/1 :: (rabbit_types:amqqueue())
+ -> rabbit_types:ok_or_error2(qlen(),
+ 'not_exclusive')).
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
@@ -359,6 +362,9 @@ stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
emit_stats(#amqqueue{pid = QPid}) ->
delegate_cast(QPid, emit_stats).
+delete_exclusive(#amqqueue{ pid = QPid }) ->
+ gen_server2:call(QPid, delete_exclusive, infinity).
+
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d15a6eb3a9..2c53a8e319 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -43,7 +43,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2]).
+ prioritise_cast/2, prioritise_info/2]).
-import(queue).
-import(erlang).
@@ -593,6 +593,7 @@ prioritise_call(Msg, _From, _State) ->
info -> 9;
{info, _Items} -> 9;
consumers -> 9;
+ delete_exclusive -> 8;
{maybe_run_queue_via_backing_queue, _Fun} -> 6;
_ -> 0
end.
@@ -611,6 +612,10 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
+prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
+ #q{q = #amqqueue{exclusive_owner = DownPid}}) -> 8;
+prioritise_info(_Msg, _State) -> 0.
+
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
declare(Recover, From, State);
@@ -780,6 +785,16 @@ handle_call(stat, _From, State = #q{backing_queue = BQ,
active_consumers = ActiveConsumers}) ->
reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
+handle_call(delete_exclusive, _From,
+ State = #q{ backing_queue_state = BQS,
+ backing_queue = BQ,
+ q = #amqqueue{exclusive_owner = Owner}
+ }) when Owner =/= none ->
+ {stop, normal, {ok, BQ:len(BQS)}, State};
+
+handle_call(delete_exclusive, _From, State) ->
+ reply({error, not_exclusive}, State);
+
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS),
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 0a49b94d09..0b8efc8f83 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -81,7 +81,7 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) ->
fun () -> ok end,
fun () ->
erlang:demonitor(MonitorRef),
- rabbit_amqqueue:delete(Q, false, false)
+ rabbit_amqqueue:delete_exclusive(Q)
end)
|| {MonitorRef, Q} <- dict:to_list(Queues)],
{reply, ok, State}.