summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-04-28 09:00:12 +0100
committerkjnilsson <knilsson@pivotal.io>2020-04-28 09:00:12 +0100
commit721e4e99d7b77f34e9e35205a3ad9d2f77baf1bf (patch)
tree24d77e927ebb35dc002ea546801c3eb2d974f046
parent9743422413b174555f5a7990cb354d9d3d37b85d (diff)
downloadrabbitmq-server-git-721e4e99d7b77f34e9e35205a3ad9d2f77baf1bf.tar.gz
QQ: Fix crash bug when reaching delivery limit
This only happens when using basic.get but would crash the quorum queue when the delivery limit was reached due to the transient basic.get consumer being removed
-rw-r--r--src/rabbit_fifo.erl20
-rw-r--r--test/rabbit_fifo_SUITE.erl21
2 files changed, 34 insertions, 7 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 940ed0d999..3a0fe2442a 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -1141,13 +1141,19 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
return_one(MsgId, MsgNum, Msg, S0, E0,
ConsumerId)
end, {State0, Effects0}, Returned),
- #{ConsumerId := Con0} = Cons0 = State1#?MODULE.consumers,
- Con = Con0#consumer{credit = increase_credit(Con0, map_size(Returned))},
- {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con, Cons0,
- SQ0, Effects1),
- State2 = State1#?MODULE{consumers = Cons,
- service_queue = SQ},
- {State, ok, Effects} = checkout(Meta, State2, Effects2),
+ {State2, Effects3} =
+ case State1#?MODULE.consumers of
+ #{ConsumerId := Con0} = Cons0 ->
+ Con = Con0#consumer{credit = increase_credit(Con0,
+ map_size(Returned))},
+ {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con,
+ Cons0, SQ0, Effects1),
+ {State1#?MODULE{consumers = Cons,
+ service_queue = SQ}, Effects2};
+ _ ->
+ {State1, Effects1}
+ end,
+ {State, ok, Effects} = checkout(Meta, State2, Effects3),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
% used to processes messages that are finished
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index e5ca7c15cb..9f2daac762 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -295,6 +295,27 @@ return_test(_) ->
State3#rabbit_fifo.consumers),
ok.
+return_dequeue_delivery_limit_test(_) ->
+ Init = init(#{name => test,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(test, utf8)),
+ release_cursor_interval => 0,
+ delivery_limit => 1}),
+ {State0, _} = enq(1, 1, msg, Init),
+
+ Cid = {<<"cid">>, self()},
+ Cid2 = {<<"cid2">>, self()},
+
+ {State1, {MsgId1, _}} = deq(2, Cid, unsettled, State0),
+ {State2, _, _} = apply(meta(4), rabbit_fifo:make_return(Cid, [MsgId1]),
+ State1),
+
+ {State3, {MsgId2, _}} = deq(2, Cid2, unsettled, State2),
+ {State4, _, _} = apply(meta(4), rabbit_fifo:make_return(Cid2, [MsgId2]),
+ State3),
+ ?assertMatch(#{num_messages := 0}, rabbit_fifo:overview(State4)),
+ ok.
+
return_non_existent_test(_) ->
Cid = {<<"cid">>, self()},
{State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)),