diff options
| author | kjnilsson <knilsson@pivotal.io> | 2020-02-25 14:40:09 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2020-02-25 14:43:58 +0000 |
| commit | ac86a6cad73cbcee0e94e65f245c5b89fffb220b (patch) | |
| tree | 99b37f42225d0b5ff6a8495f2b936a3a2b09c32f | |
| parent | 227e69480c22fdea58d5c388ec98ecc8ecd4e8ec (diff) | |
| download | rabbitmq-server-git-ac86a6cad73cbcee0e94e65f245c5b89fffb220b.tar.gz | |
Fix QQ crash recovery bug
When using dead letter handlers the state machine would crash when a
prefix_msg was being dead-lettered on recovery. This handles this case
and also fixes an issue where the incorrect initial release cursor
interval would have been set when overriding the release cursor default.
[#171463230]
| -rw-r--r-- | src/rabbit_fifo.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 6 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 29 |
3 files changed, 37 insertions, 11 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 0917836b6a..251b370caa 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -146,7 +146,9 @@ update_config(Conf, State) -> SHICur = case State#?MODULE.cfg of #cfg{release_cursor_interval = {_, C}} -> C; - #cfg{release_cursor_interval = C} -> + #cfg{release_cursor_interval = undefined} -> + SHI; + #cfg{release_cursor_interval = C} -> C end, @@ -1086,8 +1088,9 @@ snd(T) -> return(Meta, ConsumerId, Returned, Effects0, #?MODULE{service_queue = SQ0} = State0) -> {State1, Effects1} = maps:fold( - fun(MsgId, {Tag, _} = Msg, {S0, E0}) when Tag == '$prefix_msg'; - Tag == '$empty_msg'-> + fun(MsgId, {Tag, _} = Msg, {S0, E0}) + when Tag == '$prefix_msg'; + Tag == '$empty_msg'-> return_one(MsgId, 0, Msg, S0, E0, ConsumerId); (MsgId, {MsgNum, Msg}, {S0, E0}) -> return_one(MsgId, MsgNum, Msg, S0, E0, @@ -1158,7 +1161,9 @@ dead_letter_effects(Reason, Discarded, #?MODULE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}}, Effects) -> DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, Acc) -> - [{Reason, Msg} | Acc] + [{Reason, Msg} | Acc]; + (_, _, Acc) -> + Acc end, [], Discarded), [{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects]. diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index 16e665f9df..2fae8c10ca 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -104,9 +104,9 @@ -record(cfg, {name :: atom(), resource :: rabbit_types:r('queue'), - release_cursor_interval = - {?RELEASE_CURSOR_EVERY, ?RELEASE_CURSOR_EVERY} :: - non_neg_integer() | {non_neg_integer(), non_neg_integer()}, + release_cursor_interval :: + undefined | non_neg_integer() | + {non_neg_integer(), non_neg_integer()}, dead_letter_handler :: option(applied_mfa()), become_leader_handler :: option(applied_mfa()), max_length :: option(non_neg_integer()), diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index e0fced2de3..d79dac5b6f 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -44,6 +44,7 @@ all_tests() -> scenario18, scenario19, scenario20, + scenario21, single_active, single_active_01, single_active_02, @@ -356,6 +357,24 @@ scenario20(_Config) -> max_in_memory_length => 1}, Commands), ok. +scenario21(_Config) -> + C1Pid = c:pid(0,883,1), + C1 = {<<>>, C1Pid}, + E = c:pid(0,176,1), + Commands = [ + make_checkout(C1, {auto,2,simple_prefetch}), + make_enqueue(E,1,<<"1">>), + make_enqueue(E,2,<<"2">>), + make_enqueue(E,3,<<"3">>), + rabbit_fifo:make_discard(C1, [0]), + rabbit_fifo:make_settle(C1, [1]) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + release_cursor_interval => 1, + dead_letter_handler => {?MODULE, banana, []}}, + Commands), + ok. + single_active_01(_Config) -> C1Pid = test_util:fake_pid(rabbit@fake_node1), C1 = {<<0>>, C1Pid}, @@ -450,7 +469,7 @@ snapshots(_Config) -> oneof([range(1, 10), undefined]), oneof([range(1, 1000), undefined]) }}]), - ?FORALL(O, ?LET(Ops, log_gen(250), expand(Ops)), + ?FORALL(O, ?LET(Ops, log_gen(256), expand(Ops)), collect({log_size, length(O)}, snapshots_prop( config(?FUNCTION_NAME, @@ -607,10 +626,12 @@ in_memory_limit(_Config) -> InMemoryBytes), O)))) end, [], Size). -config(Name, Length, Bytes, SingleActive, DeliveryLimit, InMemoryLength, InMemoryBytes) -> +config(Name, Length, Bytes, SingleActive, DeliveryLimit, + InMemoryLength, InMemoryBytes) -> #{name => Name, max_length => map_max(Length), max_bytes => map_max(Bytes), + dead_letter_handler => {?MODULE, banana, []}, single_active_consumer_on => SingleActive, delivery_limit => map_max(DeliveryLimit), max_in_memory_length => map_max(InMemoryLength), @@ -741,8 +762,8 @@ log_gen(Size, _Body) -> {40, {input_event, frequency([{10, settle}, {2, return}, - {1, discard}, - {1, requeue}])}}, + {2, discard}, + {2, requeue}])}}, {2, checkout_gen(oneof(CPids))}, {1, checkout_cancel_gen(oneof(CPids))}, {1, down_gen(oneof(EPids ++ CPids))}, |
