diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-06-02 15:40:26 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-06-02 15:40:26 +0100 |
| commit | c2475fe28fad0f55e32484bed76091476a419e3d (patch) | |
| tree | b387fa6fa21e9e0db3aa7d54c8ec2651f0fab217 /src | |
| parent | d03d08e75937bec437d664203c7a0ff845a3f46a (diff) | |
| download | rabbitmq-server-git-c2475fe28fad0f55e32484bed76091476a419e3d.tar.gz | |
Reduce diff from bug23554: Extend backing queue api to present reason for termination
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 6 |
4 files changed, 15 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f7b710a482..07a24af828 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -114,16 +114,16 @@ init(Q) -> msg_id_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -terminate(shutdown, State = #q{backing_queue = BQ}) -> - terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); -terminate({shutdown, _}, State = #q{backing_queue = BQ}) -> - terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); -terminate(_Reason, State = #q{backing_queue = BQ}) -> +terminate(shutdown = R, State = #q{backing_queue = BQ}) -> + terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); +terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> + terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); +terminate(Reason, State = #q{backing_queue = BQ}) -> %% FIXME: How do we cancel active subscriptions? terminate_shutdown(fun (BQS) -> rabbit_event:notify( queue_deleted, [{pid, self()}]), - BQS1 = BQ:delete_and_terminate(BQS), + BQS1 = BQ:delete_and_terminate(Reason, BQS), %% don't care if the internal delete %% doesn't return 'ok'. rabbit_amqqueue:internal_delete(qname(State)), diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index addaabc584..217ad3eb5b 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -49,11 +49,11 @@ behaviour_info(callbacks) -> {init, 4}, %% Called on queue shutdown when queue isn't being deleted. - {terminate, 1}, + {terminate, 2}, %% Called when the queue is terminating and needs to delete all %% its content. - {delete_and_terminate, 1}, + {delete_and_terminate, 2}, %% Remove all messages in the queue, but not messages which have %% been fetched and are pending acks. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1a37cdfffa..3f4aa54e7f 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2116,7 +2116,7 @@ with_fresh_variable_queue(Fun) -> {delta, {delta, undefined, 0, undefined}}, {q3, 0}, {q4, 0}, {len, 0}]), - _ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)), + _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)), passed. test_variable_queue() -> @@ -2284,7 +2284,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count + Count, VQ3), {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), - _VQ6 = rabbit_variable_queue:terminate(VQ5), + _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), VQ7 = variable_queue_init(test_amqqueue(true), true), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), @@ -2301,7 +2301,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> {_Guids, VQ4} = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:timeout(VQ4), - _VQ6 = rabbit_variable_queue:terminate(VQ5), + _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), VQ7 = variable_queue_init(test_amqqueue(true), true), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2336,7 +2336,7 @@ test_queue_recover() -> VQ1 = variable_queue_init(Q, true), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), + _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2), rabbit_amqqueue:internal_delete(QName) end), passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8ac3ad43f7..a167cca0c5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,7 +16,7 @@ -module(rabbit_variable_queue). --export([init/4, terminate/1, delete_and_terminate/1, +-export([init/4, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, @@ -452,7 +452,7 @@ init(#amqqueue { name = QueueName, durable = true }, true, init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback, PersistentClient, TransientClient). -terminate(State) -> +terminate(_Reason, State) -> State1 = #vqstate { persistent_count = PCount, index_state = IndexState, msg_store_clients = {MSCStateP, MSCStateT} } = @@ -473,7 +473,7 @@ terminate(State) -> %% the only difference between purge and delete is that delete also %% needs to delete everything that's been delivered and not ack'd. -delete_and_terminate(State) -> +delete_and_terminate(_Reason, State) -> %% TODO: there is no need to interact with qi at all - which we do %% as part of 'purge' and 'remove_pending_ack', other than %% deleting it. |
