diff options
| author | kjnilsson <knilsson@pivotal.io> | 2020-04-28 09:00:12 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2020-04-28 09:00:12 +0100 |
| commit | 721e4e99d7b77f34e9e35205a3ad9d2f77baf1bf (patch) | |
| tree | 24d77e927ebb35dc002ea546801c3eb2d974f046 | |
| parent | 9743422413b174555f5a7990cb354d9d3d37b85d (diff) | |
| download | rabbitmq-server-git-721e4e99d7b77f34e9e35205a3ad9d2f77baf1bf.tar.gz | |
QQ: Fix crash bug when reaching delivery limit
This only happens when using basic.get but would crash the quorum queue
when the delivery limit was reached due to the transient basic.get
consumer being removed
| -rw-r--r-- | src/rabbit_fifo.erl | 20 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 21 |
2 files changed, 34 insertions, 7 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 940ed0d999..3a0fe2442a 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -1141,13 +1141,19 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned, return_one(MsgId, MsgNum, Msg, S0, E0, ConsumerId) end, {State0, Effects0}, Returned), - #{ConsumerId := Con0} = Cons0 = State1#?MODULE.consumers, - Con = Con0#consumer{credit = increase_credit(Con0, map_size(Returned))}, - {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con, Cons0, - SQ0, Effects1), - State2 = State1#?MODULE{consumers = Cons, - service_queue = SQ}, - {State, ok, Effects} = checkout(Meta, State2, Effects2), + {State2, Effects3} = + case State1#?MODULE.consumers of + #{ConsumerId := Con0} = Cons0 -> + Con = Con0#consumer{credit = increase_credit(Con0, + map_size(Returned))}, + {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con, + Cons0, SQ0, Effects1), + {State1#?MODULE{consumers = Cons, + service_queue = SQ}, Effects2}; + _ -> + {State1, Effects1} + end, + {State, ok, Effects} = checkout(Meta, State2, Effects3), update_smallest_raft_index(IncomingRaftIdx, State, Effects). % used to processes messages that are finished diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index e5ca7c15cb..9f2daac762 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -295,6 +295,27 @@ return_test(_) -> State3#rabbit_fifo.consumers), ok. +return_dequeue_delivery_limit_test(_) -> + Init = init(#{name => test, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(test, utf8)), + release_cursor_interval => 0, + delivery_limit => 1}), + {State0, _} = enq(1, 1, msg, Init), + + Cid = {<<"cid">>, self()}, + Cid2 = {<<"cid2">>, self()}, + + {State1, {MsgId1, _}} = deq(2, Cid, unsettled, State0), + {State2, _, _} = apply(meta(4), rabbit_fifo:make_return(Cid, [MsgId1]), + State1), + + {State3, {MsgId2, _}} = deq(2, Cid2, unsettled, State2), + {State4, _, _} = apply(meta(4), rabbit_fifo:make_return(Cid2, [MsgId2]), + State3), + ?assertMatch(#{num_messages := 0}, rabbit_fifo:overview(State4)), + ok. + return_non_existent_test(_) -> Cid = {<<"cid">>, self()}, {State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)), |
