summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl25
1 files changed, 19 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5f96b84be1..7230e09cd2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -170,8 +170,11 @@ deliver_queue(Fun, FunAcc0,
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
- case not(AckRequired) orelse rabbit_limiter:can_send(
- LimiterPid, self()) of
+ IsMsgReady = Fun(is_message_ready, FunAcc0, State),
+ case not(AckRequired) orelse
+ ( IsMsgReady andalso
+ rabbit_limiter:can_send( LimiterPid, self())
+ ) of
true ->
case Fun(AckRequired, FunAcc0, State) of
{empty, FunAcc1, State2} ->
@@ -199,15 +202,21 @@ deliver_queue(Fun, FunAcc0,
true -> deliver_queue(Fun, FunAcc1, State3)
end
end;
- false ->
+ %% if IsMsgReady then (AckRequired and we've hit the limiter)
+ false when IsMsgReady ->
store_ch_record(C#cr{is_limit_active = true}),
NewConsumers = block_consumers(ChPid, RoundRobinTail),
- deliver_queue(Fun, FunAcc0, State #q { round_robin = NewConsumers })
+ deliver_queue(Fun, FunAcc0, State #q { round_robin = NewConsumers });
+ false ->
+ %% no message was ready, so we don't need to block anyone
+ {FunAcc0, State}
end;
{empty, _} ->
{FunAcc0, State}
end.
+deliver_from_queue(is_message_ready, undefined, #q { mixed_state = MS }) ->
+ not(rabbit_mixed_queue:is_empty(MS));
deliver_from_queue(AckRequired, Acc = undefined, State = #q { mixed_state = MS }) ->
{Res, MS2} = rabbit_mixed_queue:deliver(MS),
MS3 = case {Res, AckRequired} of
@@ -225,7 +234,9 @@ run_message_queue(State) ->
attempt_immediate_delivery(none, Msg, State) ->
Fun =
- fun (AckRequired, false, State2) ->
+ fun (is_message_ready, false, _State) ->
+ true;
+ (AckRequired, false, State2) ->
{AckTag, State3} =
if AckRequired ->
{ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered(Msg,
@@ -255,7 +266,7 @@ deliver_or_enqueue(Txn, Msg, State) ->
%% all these messages have already been delivered at least once and
%% not ack'd, but need to be either redelivered or requeued
deliver_or_requeue_n([], State) ->
- State;
+ run_message_queue(State);
deliver_or_requeue_n(MsgsWithAcks, State) ->
{{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} =
deliver_queue(fun deliver_or_requeue_msgs/3, {length(MsgsWithAcks) - 1, [], MsgsWithAcks}, State),
@@ -266,6 +277,8 @@ deliver_or_requeue_n(MsgsWithAcks, State) ->
NewState #q { mixed_state = MS2 }
end.
+deliver_or_requeue_msgs(is_message_ready, {Len, _AcksAcc, _MsgsWithAcks}, _State) ->
+ -1 < Len;
deliver_or_requeue_msgs(false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->
{{Msg, true, noack, Len}, {Len - 1, [AckTag|AcksAcc], MsgsWithAcks}, State};
deliver_or_requeue_msgs(true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->