summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-02-28 11:41:51 +0000
committerkjnilsson <knilsson@pivotal.io>2019-02-28 11:41:51 +0000
commit1485ca841451c50e05674fe90aa9763820464706 (patch)
treef75bcfb6836fe6a5e2406cc8e941a5e6af886520
parent44928187312c9ffaaa272a4daa85211b4d400e34 (diff)
downloadrabbitmq-server-git-1485ca841451c50e05674fe90aa9763820464706.tar.gz
Ensure waiting consumers can receive credit
Else they may never be able to do any processing when they get promoted to active and AMQP 1.0 is used. [#164135123]
-rw-r--r--src/rabbit_fifo.erl19
-rw-r--r--test/rabbit_fifo_SUITE.erl33
2 files changed, 50 insertions, 2 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 9ddecb0824..8332043af9 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -195,7 +195,8 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
drain = Drain, consumer_id = ConsumerId},
#?MODULE{consumers = Cons0,
- service_queue = ServiceQueue0} = State0) ->
+ service_queue = ServiceQueue0,
+ waiting_consumers = Waiting0} = State0) ->
case Cons0 of
#{ConsumerId := #consumer{delivery_count = DelCnt} = Con0} ->
%% this can go below 0 when credit is reduced
@@ -208,7 +209,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
{State1, ok, Effects} =
checkout(Meta, State0#?MODULE{service_queue = ServiceQueue,
consumers = Cons}, []),
- Response = {send_credit_reply, maps:size(State1#?MODULE.messages)},
+ Response = {send_credit_reply, messages_ready(State1)},
%% by this point all checkouts for the updated credit value
%% should be processed so we can evaluate the drain
case Drain of
@@ -232,6 +233,20 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
{multi, [Response, {send_drained, [{CTag, Drained}]}]},
Effects}
end;
+ _ when Waiting0 /= [] ->
+ %% there are waiting consuemrs
+ case lists:keytake(ConsumerId, 1, Waiting0) of
+ {value, {_, Con0 = #consumer{delivery_count = DelCnt}}, Waiting} ->
+ %% the consumer is a waiting one
+ %% grant the credit
+ C = max(0, RemoteDelCnt + NewCredit - DelCnt),
+ Con = Con0#consumer{credit = C},
+ State = State0#?MODULE{waiting_consumers =
+ [{ConsumerId, Con} | Waiting]},
+ {State, {send_credit_reply, messages_ready(State)}};
+ false ->
+ {State0, ok}
+ end;
_ ->
%% credit for unknown consumer - just ignore
{State0, ok}
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index a45d423e69..40884359f8 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -1085,6 +1085,39 @@ single_active_cancelled_with_unacked_test(_) ->
?assertMatch([], State6#rabbit_fifo.waiting_consumers),
ok.
+single_active_with_credited_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ C1 = {<<"ctag1">>, self()},
+ C2 = {<<"ctag2">>, self()},
+ % adding some consumers
+ AddConsumer = fun(C, S0) ->
+ {S, _, _} = apply(
+ meta(1),
+ make_checkout(C,
+ {auto, 0, credited},
+ #{}),
+ S0),
+ S
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [C1, C2]),
+
+ %% add some credit
+ C1Cred = rabbit_fifo:make_credit(C1, 5, 0, false),
+ {State2, _, _Effects2} = apply(meta(3), C1Cred, State1),
+ C2Cred = rabbit_fifo:make_credit(C2, 4, 0, false),
+ {State3, _} = apply(meta(4), C2Cred, State2),
+ %% both consumers should have credit
+ ?assertMatch(#{C1 := #consumer{credit = 5}},
+ State3#rabbit_fifo.consumers),
+ ?assertMatch([{C2, #consumer{credit = 4}}],
+ State3#rabbit_fifo.waiting_consumers),
+ ok.
+
meta(Idx) ->
#{index => Idx, term => 1}.