diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-02-28 11:41:51 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-02-28 11:41:51 +0000 |
| commit | 1485ca841451c50e05674fe90aa9763820464706 (patch) | |
| tree | f75bcfb6836fe6a5e2406cc8e941a5e6af886520 | |
| parent | 44928187312c9ffaaa272a4daa85211b4d400e34 (diff) | |
| download | rabbitmq-server-git-1485ca841451c50e05674fe90aa9763820464706.tar.gz | |
Ensure waiting consumers can receive credit
Else they may never be able to do any processing when they get promoted
to active and AMQP 1.0 is used.
[#164135123]
| -rw-r--r-- | src/rabbit_fifo.erl | 19 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 33 |
2 files changed, 50 insertions, 2 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 9ddecb0824..8332043af9 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -195,7 +195,8 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, drain = Drain, consumer_id = ConsumerId}, #?MODULE{consumers = Cons0, - service_queue = ServiceQueue0} = State0) -> + service_queue = ServiceQueue0, + waiting_consumers = Waiting0} = State0) -> case Cons0 of #{ConsumerId := #consumer{delivery_count = DelCnt} = Con0} -> %% this can go below 0 when credit is reduced @@ -208,7 +209,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, {State1, ok, Effects} = checkout(Meta, State0#?MODULE{service_queue = ServiceQueue, consumers = Cons}, []), - Response = {send_credit_reply, maps:size(State1#?MODULE.messages)}, + Response = {send_credit_reply, messages_ready(State1)}, %% by this point all checkouts for the updated credit value %% should be processed so we can evaluate the drain case Drain of @@ -232,6 +233,20 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, {multi, [Response, {send_drained, [{CTag, Drained}]}]}, Effects} end; + _ when Waiting0 /= [] -> + %% there are waiting consuemrs + case lists:keytake(ConsumerId, 1, Waiting0) of + {value, {_, Con0 = #consumer{delivery_count = DelCnt}}, Waiting} -> + %% the consumer is a waiting one + %% grant the credit + C = max(0, RemoteDelCnt + NewCredit - DelCnt), + Con = Con0#consumer{credit = C}, + State = State0#?MODULE{waiting_consumers = + [{ConsumerId, Con} | Waiting]}, + {State, {send_credit_reply, messages_ready(State)}}; + false -> + {State0, ok} + end; _ -> %% credit for unknown consumer - just ignore {State0, ok} diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index a45d423e69..40884359f8 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -1085,6 +1085,39 @@ single_active_cancelled_with_unacked_test(_) -> ?assertMatch([], State6#rabbit_fifo.waiting_consumers), ok. +single_active_with_credited_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + C1 = {<<"ctag1">>, self()}, + C2 = {<<"ctag2">>, self()}, + % adding some consumers + AddConsumer = fun(C, S0) -> + {S, _, _} = apply( + meta(1), + make_checkout(C, + {auto, 0, credited}, + #{}), + S0), + S + end, + State1 = lists:foldl(AddConsumer, State0, [C1, C2]), + + %% add some credit + C1Cred = rabbit_fifo:make_credit(C1, 5, 0, false), + {State2, _, _Effects2} = apply(meta(3), C1Cred, State1), + C2Cred = rabbit_fifo:make_credit(C2, 4, 0, false), + {State3, _} = apply(meta(4), C2Cred, State2), + %% both consumers should have credit + ?assertMatch(#{C1 := #consumer{credit = 5}}, + State3#rabbit_fifo.consumers), + ?assertMatch([{C2, #consumer{credit = 4}}], + State3#rabbit_fifo.waiting_consumers), + ok. + meta(Idx) -> #{index => Idx, term => 1}. |
