diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 7 |
2 files changed, 9 insertions, 4 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b2519b7aff..a92d136b7a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -411,11 +411,11 @@ deliver_from_queue_deliver(AckRequired, false, State) -> run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3}, - #q{backing_queue = BQ, backing_queue_state = BQS} = + State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State), IsEmpty = BQ:is_empty(BQS), - {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), - State1. + {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), + State2. attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7d58402613..31ec007e51 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -618,8 +618,13 @@ internal_fetch(AckRequired, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), Len1 = Len - 1, + + RamMsgCount1 = case Msg =:= undefined of + true -> RamMsgCount; + false -> RamMsgCount - 1 + end, {{Msg, IsDelivered, AckTag, Len1}, - a(State #vqstate { ram_msg_count = RamMsgCount - 1, + a(State #vqstate { ram_msg_count = RamMsgCount1, out_counter = OutCount + 1, index_state = IndexState2, len = Len1, |
