summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl8
1 files changed, 6 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5d78b2055f..406429ef8f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -132,11 +132,16 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
reply(Reply, NewState) ->
+ assert_invariant(NewState),
{reply, Reply, start_memory_timer(NewState), hibernate}.
noreply(NewState) ->
+ assert_invariant(NewState),
{noreply, start_memory_timer(NewState), hibernate}.
+assert_invariant(#q { active_consumers = AC, mixed_state = MS }) ->
+ true = (queue:is_empty(AC) orelse rabbit_mixed_queue:is_empty(MS)).
+
start_memory_timer(State = #q { memory_report_timer = undefined }) ->
{ok, TRef} = timer:send_after(?MINIMUM_MEMORY_REPORT_TIME_INTERVAL,
report_memory),
@@ -329,8 +334,7 @@ deliver_or_requeue_n(MsgsWithAcks, State) ->
{{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} =
deliver_msgs_to_consumers(
Funs, {length(MsgsWithAcks), [], MsgsWithAcks}, State),
- {ok, MS} = rabbit_mixed_queue:ack(AutoAcks,
- NewState #q.mixed_state),
+ {ok, MS} = rabbit_mixed_queue:ack(AutoAcks, NewState #q.mixed_state),
case OutstandingMsgs of
[] -> NewState #q { mixed_state = MS };
_ -> {ok, MS1} = rabbit_mixed_queue:requeue(OutstandingMsgs, MS),