summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_fifo.erl13
-rw-r--r--src/rabbit_fifo.hrl6
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl29
3 files changed, 37 insertions, 11 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 0917836b6a..251b370caa 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -146,7 +146,9 @@ update_config(Conf, State) ->
SHICur = case State#?MODULE.cfg of
#cfg{release_cursor_interval = {_, C}} ->
C;
- #cfg{release_cursor_interval = C} ->
+ #cfg{release_cursor_interval = undefined} ->
+ SHI;
+ #cfg{release_cursor_interval = C} ->
C
end,
@@ -1086,8 +1088,9 @@ snd(T) ->
return(Meta, ConsumerId, Returned,
Effects0, #?MODULE{service_queue = SQ0} = State0) ->
{State1, Effects1} = maps:fold(
- fun(MsgId, {Tag, _} = Msg, {S0, E0}) when Tag == '$prefix_msg';
- Tag == '$empty_msg'->
+ fun(MsgId, {Tag, _} = Msg, {S0, E0})
+ when Tag == '$prefix_msg';
+ Tag == '$empty_msg'->
return_one(MsgId, 0, Msg, S0, E0, ConsumerId);
(MsgId, {MsgNum, Msg}, {S0, E0}) ->
return_one(MsgId, MsgNum, Msg, S0, E0,
@@ -1158,7 +1161,9 @@ dead_letter_effects(Reason, Discarded,
#?MODULE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}},
Effects) ->
DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, Acc) ->
- [{Reason, Msg} | Acc]
+ [{Reason, Msg} | Acc];
+ (_, _, Acc) ->
+ Acc
end, [], Discarded),
[{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects].
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index 16e665f9df..2fae8c10ca 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -104,9 +104,9 @@
-record(cfg,
{name :: atom(),
resource :: rabbit_types:r('queue'),
- release_cursor_interval =
- {?RELEASE_CURSOR_EVERY, ?RELEASE_CURSOR_EVERY} ::
- non_neg_integer() | {non_neg_integer(), non_neg_integer()},
+ release_cursor_interval ::
+ undefined | non_neg_integer() |
+ {non_neg_integer(), non_neg_integer()},
dead_letter_handler :: option(applied_mfa()),
become_leader_handler :: option(applied_mfa()),
max_length :: option(non_neg_integer()),
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index e0fced2de3..d79dac5b6f 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -44,6 +44,7 @@ all_tests() ->
scenario18,
scenario19,
scenario20,
+ scenario21,
single_active,
single_active_01,
single_active_02,
@@ -356,6 +357,24 @@ scenario20(_Config) ->
max_in_memory_length => 1}, Commands),
ok.
+scenario21(_Config) ->
+ C1Pid = c:pid(0,883,1),
+ C1 = {<<>>, C1Pid},
+ E = c:pid(0,176,1),
+ Commands = [
+ make_checkout(C1, {auto,2,simple_prefetch}),
+ make_enqueue(E,1,<<"1">>),
+ make_enqueue(E,2,<<"2">>),
+ make_enqueue(E,3,<<"3">>),
+ rabbit_fifo:make_discard(C1, [0]),
+ rabbit_fifo:make_settle(C1, [1])
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ release_cursor_interval => 1,
+ dead_letter_handler => {?MODULE, banana, []}},
+ Commands),
+ ok.
+
single_active_01(_Config) ->
C1Pid = test_util:fake_pid(rabbit@fake_node1),
C1 = {<<0>>, C1Pid},
@@ -450,7 +469,7 @@ snapshots(_Config) ->
oneof([range(1, 10), undefined]),
oneof([range(1, 1000), undefined])
}}]),
- ?FORALL(O, ?LET(Ops, log_gen(250), expand(Ops)),
+ ?FORALL(O, ?LET(Ops, log_gen(256), expand(Ops)),
collect({log_size, length(O)},
snapshots_prop(
config(?FUNCTION_NAME,
@@ -607,10 +626,12 @@ in_memory_limit(_Config) ->
InMemoryBytes), O))))
end, [], Size).
-config(Name, Length, Bytes, SingleActive, DeliveryLimit, InMemoryLength, InMemoryBytes) ->
+config(Name, Length, Bytes, SingleActive, DeliveryLimit,
+ InMemoryLength, InMemoryBytes) ->
#{name => Name,
max_length => map_max(Length),
max_bytes => map_max(Bytes),
+ dead_letter_handler => {?MODULE, banana, []},
single_active_consumer_on => SingleActive,
delivery_limit => map_max(DeliveryLimit),
max_in_memory_length => map_max(InMemoryLength),
@@ -741,8 +762,8 @@ log_gen(Size, _Body) ->
{40, {input_event,
frequency([{10, settle},
{2, return},
- {1, discard},
- {1, requeue}])}},
+ {2, discard},
+ {2, requeue}])}},
{2, checkout_gen(oneof(CPids))},
{1, checkout_cancel_gen(oneof(CPids))},
{1, down_gen(oneof(EPids ++ CPids))},