summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2020-04-28 13:08:09 +0300
committerGitHub <noreply@github.com>2020-04-28 13:08:09 +0300
commitdc088f3f7a10a7aff743133ac436eda84d877231 (patch)
tree27289bbc907265a92fb2fc8dffdde0f1da213be1
parent84f2be6012e5be0554eb480143d825aecdccefe5 (diff)
parent721e4e99d7b77f34e9e35205a3ad9d2f77baf1bf (diff)
downloadrabbitmq-server-git-dc088f3f7a10a7aff743133ac436eda84d877231.tar.gz
Merge pull request #2330 from rabbitmq/qq-delivery-limit-bug
QQ: Fix crash bug when reaching delivery limit
-rw-r--r--src/rabbit_fifo.erl20
-rw-r--r--test/rabbit_fifo_SUITE.erl21
2 files changed, 34 insertions, 7 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 940ed0d999..3a0fe2442a 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -1141,13 +1141,19 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
return_one(MsgId, MsgNum, Msg, S0, E0,
ConsumerId)
end, {State0, Effects0}, Returned),
- #{ConsumerId := Con0} = Cons0 = State1#?MODULE.consumers,
- Con = Con0#consumer{credit = increase_credit(Con0, map_size(Returned))},
- {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con, Cons0,
- SQ0, Effects1),
- State2 = State1#?MODULE{consumers = Cons,
- service_queue = SQ},
- {State, ok, Effects} = checkout(Meta, State2, Effects2),
+ {State2, Effects3} =
+ case State1#?MODULE.consumers of
+ #{ConsumerId := Con0} = Cons0 ->
+ Con = Con0#consumer{credit = increase_credit(Con0,
+ map_size(Returned))},
+ {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con,
+ Cons0, SQ0, Effects1),
+ {State1#?MODULE{consumers = Cons,
+ service_queue = SQ}, Effects2};
+ _ ->
+ {State1, Effects1}
+ end,
+ {State, ok, Effects} = checkout(Meta, State2, Effects3),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
% used to processes messages that are finished
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index e5ca7c15cb..9f2daac762 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -295,6 +295,27 @@ return_test(_) ->
State3#rabbit_fifo.consumers),
ok.
+return_dequeue_delivery_limit_test(_) ->
+ Init = init(#{name => test,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(test, utf8)),
+ release_cursor_interval => 0,
+ delivery_limit => 1}),
+ {State0, _} = enq(1, 1, msg, Init),
+
+ Cid = {<<"cid">>, self()},
+ Cid2 = {<<"cid2">>, self()},
+
+ {State1, {MsgId1, _}} = deq(2, Cid, unsettled, State0),
+ {State2, _, _} = apply(meta(4), rabbit_fifo:make_return(Cid, [MsgId1]),
+ State1),
+
+ {State3, {MsgId2, _}} = deq(2, Cid2, unsettled, State2),
+ {State4, _, _} = apply(meta(4), rabbit_fifo:make_return(Cid2, [MsgId2]),
+ State3),
+ ?assertMatch(#{num_messages := 0}, rabbit_fifo:overview(State4)),
+ ok.
+
return_non_existent_test(_) ->
Cid = {<<"cid">>, self()},
{State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)),