summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-02-28 11:41:51 +0000
committerkjnilsson <knilsson@pivotal.io>2019-02-28 11:41:51 +0000
commit1485ca841451c50e05674fe90aa9763820464706 (patch)
treef75bcfb6836fe6a5e2406cc8e941a5e6af886520 /src
parent44928187312c9ffaaa272a4daa85211b4d400e34 (diff)
downloadrabbitmq-server-git-1485ca841451c50e05674fe90aa9763820464706.tar.gz
Ensure waiting consumers can receive credit
Else they may never be able to do any processing when they get promoted to active and AMQP 1.0 is used. [#164135123]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl19
1 files changed, 17 insertions, 2 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 9ddecb0824..8332043af9 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -195,7 +195,8 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
drain = Drain, consumer_id = ConsumerId},
#?MODULE{consumers = Cons0,
- service_queue = ServiceQueue0} = State0) ->
+ service_queue = ServiceQueue0,
+ waiting_consumers = Waiting0} = State0) ->
case Cons0 of
#{ConsumerId := #consumer{delivery_count = DelCnt} = Con0} ->
%% this can go below 0 when credit is reduced
@@ -208,7 +209,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
{State1, ok, Effects} =
checkout(Meta, State0#?MODULE{service_queue = ServiceQueue,
consumers = Cons}, []),
- Response = {send_credit_reply, maps:size(State1#?MODULE.messages)},
+ Response = {send_credit_reply, messages_ready(State1)},
%% by this point all checkouts for the updated credit value
%% should be processed so we can evaluate the drain
case Drain of
@@ -232,6 +233,20 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
{multi, [Response, {send_drained, [{CTag, Drained}]}]},
Effects}
end;
+ _ when Waiting0 /= [] ->
+ %% there are waiting consuemrs
+ case lists:keytake(ConsumerId, 1, Waiting0) of
+ {value, {_, Con0 = #consumer{delivery_count = DelCnt}}, Waiting} ->
+ %% the consumer is a waiting one
+ %% grant the credit
+ C = max(0, RemoteDelCnt + NewCredit - DelCnt),
+ Con = Con0#consumer{credit = C},
+ State = State0#?MODULE{waiting_consumers =
+ [{ConsumerId, Con} | Waiting]},
+ {State, {send_credit_reply, messages_ready(State)}};
+ false ->
+ {State0, ok}
+ end;
_ ->
%% credit for unknown consumer - just ignore
{State0, ok}