summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-06-02 15:40:26 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-06-02 15:40:26 +0100
commitc2475fe28fad0f55e32484bed76091476a419e3d (patch)
treeb387fa6fa21e9e0db3aa7d54c8ec2651f0fab217 /src
parentd03d08e75937bec437d664203c7a0ff845a3f46a (diff)
downloadrabbitmq-server-git-c2475fe28fad0f55e32484bed76091476a419e3d.tar.gz
Reduce diff from bug23554: Extend backing queue api to present reason for termination
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_tests.erl8
-rw-r--r--src/rabbit_variable_queue.erl6
4 files changed, 15 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f7b710a482..07a24af828 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -114,16 +114,16 @@ init(Q) ->
msg_id_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-terminate(shutdown, State = #q{backing_queue = BQ}) ->
- terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
-terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
- terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
-terminate(_Reason, State = #q{backing_queue = BQ}) ->
+terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+terminate(Reason, State = #q{backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
terminate_shutdown(fun (BQS) ->
rabbit_event:notify(
queue_deleted, [{pid, self()}]),
- BQS1 = BQ:delete_and_terminate(BQS),
+ BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete
%% doesn't return 'ok'.
rabbit_amqqueue:internal_delete(qname(State)),
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index addaabc584..217ad3eb5b 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -49,11 +49,11 @@ behaviour_info(callbacks) ->
{init, 4},
%% Called on queue shutdown when queue isn't being deleted.
- {terminate, 1},
+ {terminate, 2},
%% Called when the queue is terminating and needs to delete all
%% its content.
- {delete_and_terminate, 1},
+ {delete_and_terminate, 2},
%% Remove all messages in the queue, but not messages which have
%% been fetched and are pending acks.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1a37cdfffa..3f4aa54e7f 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2116,7 +2116,7 @@ with_fresh_variable_queue(Fun) ->
{delta, {delta, undefined, 0, undefined}},
{q3, 0}, {q4, 0},
{len, 0}]),
- _ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)),
+ _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)),
passed.
test_variable_queue() ->
@@ -2284,7 +2284,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count + Count, VQ3),
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
- _VQ6 = rabbit_variable_queue:terminate(VQ5),
+ _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
@@ -2301,7 +2301,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
{_Guids, VQ4} =
rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:timeout(VQ4),
- _VQ6 = rabbit_variable_queue:terminate(VQ5),
+ _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2336,7 +2336,7 @@ test_queue_recover() ->
VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
- _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
+ _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
rabbit_amqqueue:internal_delete(QName)
end),
passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8ac3ad43f7..a167cca0c5 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,7 +16,7 @@
-module(rabbit_variable_queue).
--export([init/4, terminate/1, delete_and_terminate/1,
+-export([init/4, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
@@ -452,7 +452,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback,
PersistentClient, TransientClient).
-terminate(State) ->
+terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
@@ -473,7 +473,7 @@ terminate(State) ->
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
-delete_and_terminate(State) ->
+delete_and_terminate(_Reason, State) ->
%% TODO: there is no need to interact with qi at all - which we do
%% as part of 'purge' and 'remove_pending_ack', other than
%% deleting it.