diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 19 |
1 files changed, 17 insertions, 2 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 9ddecb0824..8332043af9 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -195,7 +195,8 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, drain = Drain, consumer_id = ConsumerId}, #?MODULE{consumers = Cons0, - service_queue = ServiceQueue0} = State0) -> + service_queue = ServiceQueue0, + waiting_consumers = Waiting0} = State0) -> case Cons0 of #{ConsumerId := #consumer{delivery_count = DelCnt} = Con0} -> %% this can go below 0 when credit is reduced @@ -208,7 +209,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, {State1, ok, Effects} = checkout(Meta, State0#?MODULE{service_queue = ServiceQueue, consumers = Cons}, []), - Response = {send_credit_reply, maps:size(State1#?MODULE.messages)}, + Response = {send_credit_reply, messages_ready(State1)}, %% by this point all checkouts for the updated credit value %% should be processed so we can evaluate the drain case Drain of @@ -232,6 +233,20 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, {multi, [Response, {send_drained, [{CTag, Drained}]}]}, Effects} end; + _ when Waiting0 /= [] -> + %% there are waiting consuemrs + case lists:keytake(ConsumerId, 1, Waiting0) of + {value, {_, Con0 = #consumer{delivery_count = DelCnt}}, Waiting} -> + %% the consumer is a waiting one + %% grant the credit + C = max(0, RemoteDelCnt + NewCredit - DelCnt), + Con = Con0#consumer{credit = C}, + State = State0#?MODULE{waiting_consumers = + [{ConsumerId, Con} | Waiting]}, + {State, {send_credit_reply, messages_ready(State)}}; + false -> + {State0, ok} + end; _ -> %% credit for unknown consumer - just ignore {State0, ok} |
