summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl24
-rw-r--r--src/rabbit_amqqueue_process.erl18
-rw-r--r--src/rabbit_tests.erl2
3 files changed, 24 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c95efa1447..0ce7efd6e0 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -32,7 +32,7 @@
%% internal
--export([internal_declare/2, internal_delete/1, run_backing_queue/3,
+-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2]).
-include("rabbit.hrl").
@@ -142,11 +142,11 @@
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
--spec(internal_delete/1 ::
- (name()) -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit() |
- fun (() -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit())).
+-spec(internal_delete/2 ::
+ (name(), pid()) -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit() |
+ fun (() -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit())).
-spec(run_backing_queue/3 ::
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
@@ -229,7 +229,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[ExistingQ = #amqqueue{pid = QPid}] ->
case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
- false -> TailFun = internal_delete(QueueName),
+ false -> TailFun = internal_delete(QueueName, QPid),
fun () -> TailFun(), ExistingQ end
end
end
@@ -515,13 +515,19 @@ internal_delete1(QueueName) ->
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName).
-internal_delete(QueueName) ->
+internal_delete(QueueName, QPid) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> rabbit_misc:const({error, not_found});
[_] -> Deletions = internal_delete1(QueueName),
- rabbit_binding:process_deletions(Deletions)
+ T = rabbit_binding:process_deletions(Deletions),
+ fun() ->
+ ok = T(),
+ ok = rabbit_event:notify(queue_deleted,
+ [{pid, QPid},
+ {name, QueueName}])
+ end
end
end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b8b27443da..829627157d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -183,16 +183,14 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
terminate(Reason, State = #q{q = #amqqueue{name = QName},
backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
- terminate_shutdown(fun (BQS) ->
- rabbit_event:notify(
- queue_deleted, [{pid, self()},
- {name, QName}]),
- BQS1 = BQ:delete_and_terminate(Reason, BQS),
- %% don't care if the internal delete
- %% doesn't return 'ok'.
- rabbit_amqqueue:internal_delete(qname(State)),
- BQS1
- end, State).
+ terminate_shutdown(
+ fun (BQS) ->
+ BQS1 = BQ:delete_and_terminate(Reason, BQS),
+ %% don't care if the internal delete
+ %% doesn't return 'ok'.
+ rabbit_amqqueue:internal_delete(QName, self()),
+ BQS1
+ end, State).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index f7e3baa706..629224b227 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2530,7 +2530,7 @@ test_queue_recover() ->
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
- rabbit_amqqueue:internal_delete(QName)
+ rabbit_amqqueue:internal_delete(QName, QPid1)
end),
passed.