summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_variable_queue.erl17
2 files changed, 11 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 93ebc3c550..36d047e8a4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -145,7 +145,7 @@ terminate(_Reason, State = #q{variable_queue_state = VQS}) ->
%% called internal_delete first, we would then have a race between
%% the disk delete and a new queue with the same name being
%% created and published to.
- _VQS = rabbit_variable_queue:delete(VQS1),
+ _VQS = rabbit_variable_queue:delete_and_terminate(VQS1),
ok = rabbit_amqqueue:internal_delete(qname(State)).
code_change(_OldVsn, State, _Extra) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9d5c2b99ad..0043bb5ff7 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -33,10 +33,10 @@
-export([init/1, terminate/1, publish/2, publish_delivered/2,
set_queue_ram_duration_target/2, remeasure_rates/1,
- ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1, delete/1,
- requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4,
- tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1,
- flush_journal/1, status/1]).
+ ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1,
+ delete_and_terminate/1, requeue/2, tx_publish/2, tx_rollback/2,
+ tx_commit/4, tx_commit_from_msg_store/4, tx_commit_from_vq/1,
+ needs_sync/1, flush_journal/1, status/1]).
%%----------------------------------------------------------------------------
%% Definitions:
@@ -230,7 +230,7 @@
-spec(len/1 :: (vqstate()) -> non_neg_integer()).
-spec(is_empty/1 :: (vqstate()) -> boolean()).
-spec(purge/1 :: (vqstate()) -> {non_neg_integer(), vqstate()}).
--spec(delete/1 :: (vqstate()) -> vqstate()).
+-spec(delete_and_terminate/1 :: (vqstate()) -> vqstate()).
-spec(requeue/2 :: ([{basic_message(), ack()}], vqstate()) -> vqstate()).
-spec(tx_publish/2 :: (basic_message(), vqstate()) -> vqstate()).
-spec(tx_rollback/2 :: ([msg_id()], vqstate()) -> vqstate()).
@@ -451,8 +451,10 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
-delete(State) ->
- {_PurgeCount, State1 = #vqstate { index_state = IndexState }} = purge(State),
+delete_and_terminate(State) ->
+ {_PurgeCount, State1 = #vqstate { index_state = IndexState,
+ msg_store_read_state = MSCState }} =
+ purge(State),
IndexState1 =
case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(
IndexState) of
@@ -464,6 +466,7 @@ delete(State) ->
IndexState3
end,
IndexState4 = rabbit_queue_index:terminate_and_erase(IndexState1),
+ rabbit_msg_store:client_terminate(MSCState),
State1 #vqstate { index_state = IndexState4 }.
%% [{Msg, AckTag}]