diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 25 |
1 files changed, 19 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5f96b84be1..7230e09cd2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -170,8 +170,11 @@ deliver_queue(Fun, FunAcc0, C = #cr{limiter_pid = LimiterPid, unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), - case not(AckRequired) orelse rabbit_limiter:can_send( - LimiterPid, self()) of + IsMsgReady = Fun(is_message_ready, FunAcc0, State), + case not(AckRequired) orelse + ( IsMsgReady andalso + rabbit_limiter:can_send( LimiterPid, self()) + ) of true -> case Fun(AckRequired, FunAcc0, State) of {empty, FunAcc1, State2} -> @@ -199,15 +202,21 @@ deliver_queue(Fun, FunAcc0, true -> deliver_queue(Fun, FunAcc1, State3) end end; - false -> + %% if IsMsgReady then (AckRequired and we've hit the limiter) + false when IsMsgReady -> store_ch_record(C#cr{is_limit_active = true}), NewConsumers = block_consumers(ChPid, RoundRobinTail), - deliver_queue(Fun, FunAcc0, State #q { round_robin = NewConsumers }) + deliver_queue(Fun, FunAcc0, State #q { round_robin = NewConsumers }); + false -> + %% no message was ready, so we don't need to block anyone + {FunAcc0, State} end; {empty, _} -> {FunAcc0, State} end. +deliver_from_queue(is_message_ready, undefined, #q { mixed_state = MS }) -> + not(rabbit_mixed_queue:is_empty(MS)); deliver_from_queue(AckRequired, Acc = undefined, State = #q { mixed_state = MS }) -> {Res, MS2} = rabbit_mixed_queue:deliver(MS), MS3 = case {Res, AckRequired} of @@ -225,7 +234,9 @@ run_message_queue(State) -> attempt_immediate_delivery(none, Msg, State) -> Fun = - fun (AckRequired, false, State2) -> + fun (is_message_ready, false, _State) -> + true; + (AckRequired, false, State2) -> {AckTag, State3} = if AckRequired -> {ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered(Msg, @@ -255,7 +266,7 @@ deliver_or_enqueue(Txn, Msg, State) -> %% all these messages have already been delivered at least once and %% not ack'd, but need to be either redelivered or requeued deliver_or_requeue_n([], State) -> - State; + run_message_queue(State); deliver_or_requeue_n(MsgsWithAcks, State) -> {{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} = deliver_queue(fun deliver_or_requeue_msgs/3, {length(MsgsWithAcks) - 1, [], MsgsWithAcks}, State), @@ -266,6 +277,8 @@ deliver_or_requeue_n(MsgsWithAcks, State) -> NewState #q { mixed_state = MS2 } end. +deliver_or_requeue_msgs(is_message_ready, {Len, _AcksAcc, _MsgsWithAcks}, _State) -> + -1 < Len; deliver_or_requeue_msgs(false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> {{Msg, true, noack, Len}, {Len - 1, [AckTag|AcksAcc], MsgsWithAcks}, State}; deliver_or_requeue_msgs(true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> |
