diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 17 |
2 files changed, 11 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 93ebc3c550..36d047e8a4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -145,7 +145,7 @@ terminate(_Reason, State = #q{variable_queue_state = VQS}) -> %% called internal_delete first, we would then have a race between %% the disk delete and a new queue with the same name being %% created and published to. - _VQS = rabbit_variable_queue:delete(VQS1), + _VQS = rabbit_variable_queue:delete_and_terminate(VQS1), ok = rabbit_amqqueue:internal_delete(qname(State)). code_change(_OldVsn, State, _Extra) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 9d5c2b99ad..0043bb5ff7 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -33,10 +33,10 @@ -export([init/1, terminate/1, publish/2, publish_delivered/2, set_queue_ram_duration_target/2, remeasure_rates/1, - ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1, delete/1, - requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, - tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1, - flush_journal/1, status/1]). + ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1, + delete_and_terminate/1, requeue/2, tx_publish/2, tx_rollback/2, + tx_commit/4, tx_commit_from_msg_store/4, tx_commit_from_vq/1, + needs_sync/1, flush_journal/1, status/1]). %%---------------------------------------------------------------------------- %% Definitions: @@ -230,7 +230,7 @@ -spec(len/1 :: (vqstate()) -> non_neg_integer()). -spec(is_empty/1 :: (vqstate()) -> boolean()). -spec(purge/1 :: (vqstate()) -> {non_neg_integer(), vqstate()}). --spec(delete/1 :: (vqstate()) -> vqstate()). +-spec(delete_and_terminate/1 :: (vqstate()) -> vqstate()). -spec(requeue/2 :: ([{basic_message(), ack()}], vqstate()) -> vqstate()). -spec(tx_publish/2 :: (basic_message(), vqstate()) -> vqstate()). -spec(tx_rollback/2 :: ([msg_id()], vqstate()) -> vqstate()). @@ -451,8 +451,10 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> %% the only difference between purge and delete is that delete also %% needs to delete everything that's been delivered and not ack'd. -delete(State) -> - {_PurgeCount, State1 = #vqstate { index_state = IndexState }} = purge(State), +delete_and_terminate(State) -> + {_PurgeCount, State1 = #vqstate { index_state = IndexState, + msg_store_read_state = MSCState }} = + purge(State), IndexState1 = case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id( IndexState) of @@ -464,6 +466,7 @@ delete(State) -> IndexState3 end, IndexState4 = rabbit_queue_index:terminate_and_erase(IndexState1), + rabbit_msg_store:client_terminate(MSCState), State1 #vqstate { index_state = IndexState4 }. %% [{Msg, AckTag}] |
