summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-08-21 16:35:02 +0100
committerkjnilsson <knilsson@pivotal.io>2020-09-01 16:36:47 +0100
commitb6f60ff90bbcabf591b7189c90c818b7c11a36f6 (patch)
tree61720044b575cb452fadeae3082ac1fef180083e
parent7093e4efc3a2791c47ded7975b66bf7e108c1862 (diff)
downloadrabbitmq-server-git-qq-queue-ttl.tar.gz
Quorum queue 'Queue TTL' support.qq-queue-ttl
The quorum queue will track the timestamp of whenever a consumer was removed and periodically check if the queue should be deleted based on it's expiry configuration.
-rw-r--r--src/rabbit_fifo.erl287
-rw-r--r--src/rabbit_fifo.hrl9
-rw-r--r--src/rabbit_quorum_queue.erl18
-rw-r--r--test/quorum_queue_SUITE.erl31
-rw-r--r--test/rabbit_fifo_SUITE.erl121
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl4
6 files changed, 318 insertions, 152 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index aea2850084..b59f4b04f8 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -134,6 +134,7 @@ update_config(Conf, State) ->
MaxMemoryLength = maps:get(max_in_memory_length, Conf, undefined),
MaxMemoryBytes = maps:get(max_in_memory_bytes, Conf, undefined),
DeliveryLimit = maps:get(delivery_limit, Conf, undefined),
+ Expires = maps:get(expires, Conf, undefined),
ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
true ->
single_active;
@@ -148,6 +149,7 @@ update_config(Conf, State) ->
{RCI, C}
end,
+ LastActive = maps:get(created, Conf, undefined),
State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCISpec,
dead_letter_handler = DLH,
become_leader_handler = BLH,
@@ -157,7 +159,9 @@ update_config(Conf, State) ->
max_in_memory_length = MaxMemoryLength,
max_in_memory_bytes = MaxMemoryBytes,
consumer_strategy = ConsumerStrategy,
- delivery_limit = DeliveryLimit}}.
+ delivery_limit = DeliveryLimit,
+ expires = Expires},
+ last_active = LastActive}.
zero(_) ->
0.
@@ -285,10 +289,14 @@ apply(_, #checkout{spec = {dequeue, _}},
#?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) ->
{State0, {error, unsupported}};
apply(#{index := Index,
+ system_time := Ts,
from := From} = Meta, #checkout{spec = {dequeue, Settlement},
meta = ConsumerMeta,
consumer_id = ConsumerId},
- #?MODULE{consumers = Consumers} = State0) ->
+ #?MODULE{consumers = Consumers} = State00) ->
+ %% dequeue always updates last_active
+ State0 = State00#?MODULE{last_active = Ts},
+ %% all dequeue operations result in keeping the queue from expiring
Exists = maps:is_key(ConsumerId, Consumers),
case messages_ready(State0) of
0 ->
@@ -300,7 +308,7 @@ apply(#{index := Index,
State1 = update_consumer(ConsumerId, ConsumerMeta,
{once, 1, simple_prefetch},
State0),
- {success, _, MsgId, Msg, State2} = checkout_one(State1),
+ {success, _, MsgId, Msg, State2} = checkout_one(Meta, State1),
{State4, Effects1} = case Settlement of
unsettled ->
{_, Pid} = ConsumerId,
@@ -332,7 +340,8 @@ apply(#{index := Index,
end
end;
apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
- {State, Effects} = cancel_consumer(ConsumerId, State0, [], consumer_cancel),
+ {State, Effects} = cancel_consumer(Meta, ConsumerId, State0, [],
+ consumer_cancel),
checkout(Meta, State0, State, Effects);
apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
consumer_id = {_, Pid} = ConsumerId},
@@ -364,7 +373,7 @@ apply(#{index := Index}, #purge{},
{State, false, Effects} ->
{State, Reply, Effects}
end;
-apply(Meta, {down, Pid, noconnection},
+apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = Waiting0,
@@ -381,7 +390,7 @@ apply(Meta, {down, Pid, noconnection},
S0, Cid, C0, false, suspected_down, E0),
Checked = C0#consumer.checked_out,
Credit = increase_credit(C0, maps:size(Checked)),
- {St, Effs1} = return_all(S0, Effs,
+ {St, Effs1} = return_all(Meta, S0, Effs,
Cid, C0#consumer{credit = Credit}),
%% if the consumer was cancelled there is a chance it got
%% removed when returning hence we need to be defensive here
@@ -392,7 +401,8 @@ apply(Meta, {down, Pid, noconnection},
Waiting0
end,
{St#?MODULE{consumers = maps:remove(Cid, St#?MODULE.consumers),
- waiting_consumers = Waiting},
+ waiting_consumers = Waiting,
+ last_active = Ts},
Effs1};
(_, _, S) ->
S
@@ -411,7 +421,7 @@ apply(Meta, {down, Pid, noconnection},
end, Enqs0),
Effects = [{monitor, node, Node} | Effects1],
checkout(Meta, State0, State#?MODULE{enqueuers = Enqs}, Effects);
-apply(Meta, {down, Pid, noconnection},
+apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
%% A node has been disconnected. This doesn't necessarily mean that
@@ -432,7 +442,7 @@ apply(Meta, {down, Pid, noconnection},
Credit = increase_credit(C0, map_size(Checked0)),
C = C0#consumer{status = suspected_down,
credit = Credit},
- {St, Eff0} = return_all(St0, Eff, Cid, C),
+ {St, Eff0} = return_all(Meta, St0, Eff, Cid, C),
Eff1 = consumer_update_active_effects(St, Cid, C, false,
suspected_down, Eff0),
{St, Eff1};
@@ -453,13 +463,14 @@ apply(Meta, {down, Pid, noconnection},
_ ->
[{monitor, node, Node}]
end ++ Effects1,
- checkout(Meta, State0, State#?MODULE{enqueuers = Enqs}, Effects);
+ checkout(Meta, State0, State#?MODULE{enqueuers = Enqs,
+ last_active = Ts}, Effects);
apply(Meta, {down, Pid, _Info}, State0) ->
- {State, Effects} = handle_down(Pid, State0),
+ {State, Effects} = handle_down(Meta, Pid, State0),
checkout(Meta, State0, State, Effects);
apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
enqueuers = Enqs0,
- service_queue = SQ0} = State0) ->
+ service_queue = _SQ0} = State0) ->
%% A node we are monitoring has come back.
%% If we have suspected any processes of being
%% down we should now re-issue the monitors for them to detect if they're
@@ -473,30 +484,29 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
end, Enqs0),
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
%% mark all consumers as up
- {Cons1, SQ, Effects1} =
- maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
+ {State1, Effects1} =
+ maps:fold(fun({_, P} = ConsumerId, C, {SAcc, EAcc})
when (node(P) =:= Node) and
(C#consumer.status =/= cancelled) ->
- EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId,
+ EAcc1 = ConsumerUpdateActiveFun(SAcc, ConsumerId,
C, true, up, EAcc),
- update_or_remove_sub(ConsumerId,
- C#consumer{status = up}, CAcc,
- SQAcc, EAcc1);
+ {update_or_remove_sub(Meta, ConsumerId,
+ C#consumer{status = up},
+ SAcc), EAcc1};
(_, _, Acc) ->
Acc
- end, {Cons0, SQ0, Monitors}, Cons0),
- Waiting = update_waiting_consumer_status(Node, State0, up),
- State1 = State0#?MODULE{consumers = Cons1,
+ end, {State0, Monitors}, Cons0),
+ Waiting = update_waiting_consumer_status(Node, State1, up),
+ State2 = State1#?MODULE{
enqueuers = Enqs1,
- service_queue = SQ,
waiting_consumers = Waiting},
- {State, Effects} = activate_next_consumer(State1, Effects1),
+ {State, Effects} = activate_next_consumer(State2, Effects1),
checkout(Meta, State0, State, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
-apply(_, #purge_nodes{nodes = Nodes}, State0) ->
+apply(Meta, #purge_nodes{nodes = Nodes}, State0) ->
{State, Effects} = lists:foldl(fun(Node, {S, E}) ->
- purge_node(Node, S, E)
+ purge_node(Meta, Node, S, E)
end, {State0, []}, Nodes),
{State, ok, Effects};
apply(Meta, #update_config{config = Conf}, State) ->
@@ -549,15 +559,15 @@ convert_v0_to_v1(V0State0) ->
msgs_ready_in_memory = rabbit_fifo_v0:get_field(msgs_ready_in_memory, V0State)
}.
-purge_node(Node, State, Effects) ->
+purge_node(Meta, Node, State, Effects) ->
lists:foldl(fun(Pid, {S0, E0}) ->
- {S, E} = handle_down(Pid, S0),
+ {S, E} = handle_down(Meta, Pid, S0),
{S, E0 ++ E}
end, {State, Effects}, all_pids_for(Node, State)).
%% any downs that re not noconnection
-handle_down(Pid, #?MODULE{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+handle_down(Meta, Pid, #?MODULE{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
State1 = case maps:take(Pid, Enqs0) of
@@ -574,7 +584,7 @@ handle_down(Pid, #?MODULE{consumers = Cons0,
DownConsumers = maps:keys(
maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
lists:foldl(fun(ConsumerId, {S, E}) ->
- cancel_consumer(ConsumerId, S, E, down)
+ cancel_consumer(Meta, ConsumerId, S, E, down)
end, {State2, Effects1}, DownConsumers).
consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) ->
@@ -666,19 +676,24 @@ state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= le
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
-tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
- resource = QName},
+tick(Ts, #?MODULE{cfg = #cfg{name = Name,
+ resource = QName},
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes} = State) ->
- Metrics = {Name,
- messages_ready(State),
- num_checked_out(State), % checked out
- messages_total(State),
- query_consumer_count(State), % Consumers
- EnqueueBytes,
- CheckoutBytes},
- [{mod_call, rabbit_quorum_queue,
- handle_tick, [QName, Metrics, all_nodes(State)]}].
+ case is_expired(Ts, State) of
+ true ->
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}];
+ false ->
+ Metrics = {Name,
+ messages_ready(State),
+ num_checked_out(State), % checked out
+ messages_total(State),
+ query_consumer_count(State), % Consumers
+ EnqueueBytes,
+ CheckoutBytes},
+ [{mod_call, rabbit_quorum_queue,
+ handle_tick, [QName, Metrics, all_nodes(State)]}]
+ end.
-spec overview(state()) -> map().
overview(#?MODULE{consumers = Cons,
@@ -933,17 +948,17 @@ num_checked_out(#?MODULE{consumers = Cons}) ->
maps:size(C) + Acc
end, 0, Cons).
-cancel_consumer(ConsumerId,
+cancel_consumer(Meta, ConsumerId,
#?MODULE{cfg = #cfg{consumer_strategy = competing}} = State,
Effects, Reason) ->
- cancel_consumer0(ConsumerId, State, Effects, Reason);
-cancel_consumer(ConsumerId,
+ cancel_consumer0(Meta, ConsumerId, State, Effects, Reason);
+cancel_consumer(Meta, ConsumerId,
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = []} = State,
Effects, Reason) ->
%% single active consumer on, no consumers are waiting
- cancel_consumer0(ConsumerId, State, Effects, Reason);
-cancel_consumer(ConsumerId,
+ cancel_consumer0(Meta, ConsumerId, State, Effects, Reason);
+cancel_consumer(Meta, ConsumerId,
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = Waiting0} = State0,
@@ -952,7 +967,7 @@ cancel_consumer(ConsumerId,
case maps:is_key(ConsumerId, Cons0) of
true ->
% The active consumer is to be removed
- {State1, Effects1} = cancel_consumer0(ConsumerId, State0,
+ {State1, Effects1} = cancel_consumer0(Meta, ConsumerId, State0,
Effects0, Reason),
activate_next_consumer(State1, Effects1);
false ->
@@ -976,11 +991,12 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}},
[QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]}
| Effects].
-cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
+cancel_consumer0(Meta, ConsumerId,
+ #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
case C0 of
#{ConsumerId := Consumer} ->
- {S, Effects2} = maybe_return_all(ConsumerId, Consumer, S0,
- Effects0, Reason),
+ {S, Effects2} = maybe_return_all(Meta, ConsumerId, Consumer,
+ S0, Effects0, Reason),
%% The effects are emitted before the consumer is actually removed
%% if the consumer has unacked messages. This is a bit weird but
%% in line with what classic queues do (from an external point of
@@ -1029,23 +1045,18 @@ activate_next_consumer(#?MODULE{consumers = Cons,
-maybe_return_all(ConsumerId, Consumer,
- #?MODULE{consumers = C0,
- service_queue = SQ0} = S0,
- Effects0, Reason) ->
+maybe_return_all(#{system_time := Ts} = Meta, ConsumerId, Consumer, S0, Effects0, Reason) ->
case Reason of
consumer_cancel ->
- {Cons, SQ, Effects1} =
- update_or_remove_sub(ConsumerId,
- Consumer#consumer{lifetime = once,
- credit = 0,
- status = cancelled},
- C0, SQ0, Effects0),
- {S0#?MODULE{consumers = Cons,
- service_queue = SQ}, Effects1};
+ {update_or_remove_sub(Meta, ConsumerId,
+ Consumer#consumer{lifetime = once,
+ credit = 0,
+ status = cancelled},
+ S0), Effects0};
down ->
- {S1, Effects1} = return_all(S0, Effects0, ConsumerId, Consumer),
- {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers)},
+ {S1, Effects1} = return_all(Meta, S0, Effects0, ConsumerId, Consumer),
+ {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers),
+ last_active = Ts},
Effects1}
end.
@@ -1139,7 +1150,7 @@ maybe_store_dehydrated_state(RaftIdx,
?RELEASE_CURSOR_EVERY_MAX)
end,
State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval =
- {Base, Interval}}},
+ {Base, Interval}}},
Dehydrated = dehydrate_state(State),
Cursor = {release_cursor, RaftIdx, Dehydrated},
Cursors = lqueue:in(Cursor, Cursors0),
@@ -1193,57 +1204,50 @@ snd(T) ->
element(2, T).
return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
- Effects0, #?MODULE{service_queue = SQ0} = State0) ->
+ Effects0, State0) ->
{State1, Effects1} = maps:fold(
fun(MsgId, {Tag, _} = Msg, {S0, E0})
when Tag == '$prefix_msg';
Tag == '$empty_msg'->
- return_one(MsgId, 0, Msg, S0, E0, ConsumerId);
+ return_one(Meta, MsgId, 0, Msg, S0, E0, ConsumerId);
(MsgId, {MsgNum, Msg}, {S0, E0}) ->
- return_one(MsgId, MsgNum, Msg, S0, E0,
+ return_one(Meta, MsgId, MsgNum, Msg, S0, E0,
ConsumerId)
end, {State0, Effects0}, Returned),
- {State2, Effects3} =
+ State2 =
case State1#?MODULE.consumers of
- #{ConsumerId := Con0} = Cons0 ->
+ #{ConsumerId := Con0} ->
Con = Con0#consumer{credit = increase_credit(Con0,
map_size(Returned))},
- {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con,
- Cons0, SQ0, Effects1),
- {State1#?MODULE{consumers = Cons,
- service_queue = SQ}, Effects2};
+ update_or_remove_sub(Meta, ConsumerId, Con, State1);
_ ->
- {State1, Effects1}
+ State1
end,
- {State, ok, Effects} = checkout(Meta, State0, State2, Effects3),
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
% used to processes messages that are finished
-complete(ConsumerId, Discarded,
- #consumer{checked_out = Checked} = Con0, Effects0,
- #?MODULE{consumers = Cons0, service_queue = SQ0,
- ra_indexes = Indexes0} = State0) ->
+complete(Meta, ConsumerId, Discarded,
+ #consumer{checked_out = Checked} = Con0, Effects,
+ #?MODULE{ra_indexes = Indexes0} = State0) ->
%% TODO optimise use of Discarded map here
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
%% credit_mode = simple_prefetch should automatically top-up credit
%% as messages are simple_prefetch or otherwise returned
Con = Con0#consumer{checked_out = maps:without(maps:keys(Discarded), Checked),
credit = increase_credit(Con0, map_size(Discarded))},
- {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
- SQ0, Effects0),
+ State1 = update_or_remove_sub(Meta, ConsumerId, Con, State0),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
MsgRaftIdxs),
%% TODO: use maps:fold instead
- State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) ->
+ State2 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) ->
add_bytes_settle(Header, Acc);
({'$prefix_msg', Header}, Acc) ->
add_bytes_settle(Header, Acc);
({'$empty_msg', Header}, Acc) ->
add_bytes_settle(Header, Acc)
- end, State0, maps:values(Discarded)),
- {State1#?MODULE{consumers = Cons,
- ra_indexes = Indexes,
- service_queue = SQ}, Effects}.
+ end, State1, maps:values(Discarded)),
+ {State2#?MODULE{ra_indexes = Indexes}, Effects}.
increase_credit(#consumer{lifetime = once,
credit = Credit}, _) ->
@@ -1261,7 +1265,7 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
#consumer{checked_out = Checked0} = Con0,
Effects0, State0) ->
Discarded = maps:with(MsgIds, Checked0),
- {State2, Effects1} = complete(ConsumerId, Discarded, Con0,
+ {State2, Effects1} = complete(Meta, ConsumerId, Discarded, Con0,
Effects0, State0),
{State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
@@ -1345,7 +1349,7 @@ update_header(Key, UpdateFun, Default, Header) ->
maps:update_with(Key, UpdateFun, Default, Header).
-return_one(MsgId, 0, {Tag, Header0},
+return_one(Meta, MsgId, 0, {Tag, Header0},
#?MODULE{returns = Returns,
consumers = Consumers,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
@@ -1356,7 +1360,7 @@ return_one(MsgId, 0, {Tag, Header0},
Msg0 = {Tag, Header},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
- complete(ConsumerId, #{MsgId => Msg0}, Con0, Effects0, State0);
+ complete(Meta, ConsumerId, #{MsgId => Msg0}, Con0, Effects0, State0);
_ ->
%% this should not affect the release cursor in any way
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
@@ -1376,7 +1380,7 @@ return_one(MsgId, 0, {Tag, Header0},
returns = lqueue:in(Msg, Returns)}),
Effects0}
end;
-return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
+return_one(Meta, MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
#?MODULE{returns = Returns,
consumers = Consumers,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
@@ -1389,7 +1393,7 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
DlMsg = {MsgNum, Msg0},
Effects = dead_letter_effects(delivery_limit, #{none => DlMsg},
State0, Effects0),
- complete(ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0);
+ complete(Meta, ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0);
_ ->
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
%% this should not affect the release cursor in any way
@@ -1411,23 +1415,23 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
Effects0}
end.
-return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
+return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
#consumer{checked_out = Checked0} = Con) ->
%% need to sort the list so that we return messages in the order
%% they were checked out
Checked = lists:sort(maps:to_list(Checked0)),
State = State0#?MODULE{consumers = Cons#{ConsumerId => Con}},
lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) ->
- return_one(MsgId, 0, Msg, S, E, ConsumerId);
+ return_one(Meta, MsgId, 0, Msg, S, E, ConsumerId);
({MsgId, {'$empty_msg', _} = Msg}, {S, E}) ->
- return_one(MsgId, 0, Msg, S, E, ConsumerId);
+ return_one(Meta, MsgId, 0, Msg, S, E, ConsumerId);
({MsgId, {MsgNum, Msg}}, {S, E}) ->
- return_one(MsgId, MsgNum, Msg, S, E, ConsumerId)
+ return_one(Meta, MsgId, MsgNum, Msg, S, E, ConsumerId)
end, {State, Effects0}, Checked).
%% checkout new messages to consumers
-checkout(#{index := Index}, OldState, State0, Effects0) ->
- {State1, _Result, Effects1} = checkout0(checkout_one(State0),
+checkout(#{index := Index} = Meta, OldState, State0, Effects0) ->
+ {State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0),
Effects0, {#{}, #{}}),
case evaluate_limit(Index, false, OldState, State1, Effects1) of
{State, true, Effects} ->
@@ -1436,21 +1440,21 @@ checkout(#{index := Index}, OldState, State0, Effects0) ->
{State, ok, Effects}
end.
-checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State},
+checkout0(Meta, {success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State},
Effects, {SendAcc, LogAcc0}) ->
DelMsg = {RaftIdx, {MsgId, Header}},
LogAcc = maps:update_with(ConsumerId,
fun (M) -> [DelMsg | M] end,
[DelMsg], LogAcc0),
- checkout0(checkout_one(State), Effects, {SendAcc, LogAcc});
-checkout0({success, ConsumerId, MsgId, Msg, State}, Effects,
+ checkout0(Meta, checkout_one(Meta, State), Effects, {SendAcc, LogAcc});
+checkout0(Meta, {success, ConsumerId, MsgId, Msg, State}, Effects,
{SendAcc0, LogAcc}) ->
DelMsg = {MsgId, Msg},
SendAcc = maps:update_with(ConsumerId,
fun (M) -> [DelMsg | M] end,
[DelMsg], SendAcc0),
- checkout0(checkout_one(State), Effects, {SendAcc, LogAcc});
-checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
+ checkout0(Meta, checkout_one(Meta, State), Effects, {SendAcc, LogAcc});
+checkout0(_Meta, {Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
Effects1 = case Activity of
nochange ->
append_send_msg_effects(
@@ -1610,7 +1614,7 @@ reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
{dequeue, {MsgId, {Header, Msg}}, Ready}}}]
end}.
-checkout_one(#?MODULE{service_queue = SQ0,
+checkout_one(Meta, #?MODULE{service_queue = SQ0,
messages = Messages0,
consumers = Cons0} = InitState) ->
case queue:peek(SQ0) of
@@ -1625,11 +1629,11 @@ checkout_one(#?MODULE{service_queue = SQ0,
%% no credit but was still on queue
%% can happen when draining
%% recurse without consumer on queue
- checkout_one(InitState#?MODULE{service_queue = SQ1});
+ checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
{ok, #consumer{status = cancelled}} ->
- checkout_one(InitState#?MODULE{service_queue = SQ1});
+ checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
{ok, #consumer{status = suspected_down}} ->
- checkout_one(InitState#?MODULE{service_queue = SQ1});
+ checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
{ok, #consumer{checked_out = Checked0,
next_msg_id = Next,
credit = Credit,
@@ -1639,11 +1643,9 @@ checkout_one(#?MODULE{service_queue = SQ0,
next_msg_id = Next + 1,
credit = Credit - 1,
delivery_count = DelCnt + 1},
- {Cons, SQ, []} = % we expect no effects
- update_or_remove_sub(ConsumerId, Con,
- Cons0, SQ1, []),
- State1 = State0#?MODULE{service_queue = SQ,
- consumers = Cons},
+ State1 = update_or_remove_sub(Meta,
+ ConsumerId, Con,
+ State0#?MODULE{service_queue = SQ1}),
{State, Msg} =
case ConsumerMsg of
{'$prefix_msg', Header} ->
@@ -1665,7 +1667,7 @@ checkout_one(#?MODULE{service_queue = SQ0,
{success, ConsumerId, Next, Msg, State};
error ->
%% consumer did not exist but was queued, recurse
- checkout_one(InitState#?MODULE{service_queue = SQ1})
+ checkout_one(Meta, InitState#?MODULE{service_queue = SQ1})
end;
empty ->
{nochange, InitState}
@@ -1677,32 +1679,34 @@ checkout_one(#?MODULE{service_queue = SQ0,
end
end.
-update_or_remove_sub(ConsumerId, #consumer{lifetime = auto,
- credit = 0} = Con,
- Cons, ServiceQueue, Effects) ->
- {maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects};
-update_or_remove_sub(ConsumerId, #consumer{lifetime = auto} = Con,
- Cons, ServiceQueue, Effects) ->
- {maps:put(ConsumerId, Con, Cons),
- uniq_queue_in(ConsumerId, ServiceQueue), Effects};
-update_or_remove_sub(ConsumerId, #consumer{lifetime = once,
+update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto,
+ credit = 0} = Con,
+ #?MODULE{consumers = Cons} = State) ->
+ State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons)};
+update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto} = Con,
+ #?MODULE{consumers = Cons,
+ service_queue = ServiceQueue} = State) ->
+ State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons),
+ service_queue = uniq_queue_in(ConsumerId, ServiceQueue)};
+update_or_remove_sub(#{system_time := Ts},
+ ConsumerId, #consumer{lifetime = once,
checked_out = Checked,
credit = 0} = Con,
- Cons, ServiceQueue, Effects) ->
- case maps:size(Checked) of
+ #?MODULE{consumers = Cons} = State) ->
+ case maps:size(Checked) of
0 ->
% we're done with this consumer
- % TODO: demonitor consumer pid but _only_ if there are no other
- % monitors for this pid
- {maps:remove(ConsumerId, Cons), ServiceQueue, Effects};
+ State#?MODULE{consumers = maps:remove(ConsumerId, Cons),
+ last_active = Ts};
_ ->
% there are unsettled items so need to keep around
- {maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects}
+ State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons)}
end;
-update_or_remove_sub(ConsumerId, #consumer{lifetime = once} = Con,
- Cons, ServiceQueue, Effects) ->
- {maps:put(ConsumerId, Con, Cons),
- uniq_queue_in(ConsumerId, ServiceQueue), Effects}.
+update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = once} = Con,
+ #?MODULE{consumers = Cons,
+ service_queue = ServiceQueue} = State) ->
+ State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons),
+ service_queue = uniq_queue_in(ConsumerId, ServiceQueue)}.
uniq_queue_in(Key, Queue) ->
% TODO: queue:member could surely be quite expensive, however the practical
@@ -1799,7 +1803,7 @@ dehydrate_state(#?MODULE{messages = Messages,
PPCnt + lqueue:len(Messages), PrefMsgs},
waiting_consumers = Waiting}.
-%% TODO make body recursive to avoid lists:reverse
+%% TODO make body recursive to avoid allocating lists:reverse call
dehydrate_messages(Msgs0, Acc0) ->
{OutRes, Msgs} = lqueue:out(Msgs0),
case OutRes of
@@ -1866,7 +1870,7 @@ make_checkout(ConsumerId, Spec, Meta) ->
spec = Spec, meta = Meta}.
-spec make_settle(consumer_id(), [msg_id()]) -> protocol().
-make_settle(ConsumerId, MsgIds) ->
+make_settle(ConsumerId, MsgIds) when is_list(MsgIds) ->
#settle{consumer_id = ConsumerId, msg_ids = MsgIds}.
-spec make_return(consumer_id(), [msg_id()]) -> protocol().
@@ -2024,3 +2028,18 @@ suspected_pids_for(Node, #?MODULE{consumers = Cons0,
[P | Acc];
(_, Acc) -> Acc
end, Enqs, WaitingConsumers0).
+
+is_expired(Ts, #?MODULE{cfg = #cfg{expires = Expires},
+ last_active = LastActive,
+ consumers = Consumers})
+ when is_number(LastActive) andalso is_number(Expires) ->
+ %% TODO: should it be active consumers?
+ Active = maps:filter(fun (_, #consumer{status = suspected_down}) ->
+ false;
+ (_, _) ->
+ true
+ end, Consumers),
+
+ Ts > (LastActive + Expires) andalso maps:size(Active) == 0;
+is_expired(_Ts, _State) ->
+ false.
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index f9b0d5e1c3..138b19d390 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -101,6 +101,8 @@
-type consumer_strategy() :: competing | single_active.
+-type milliseconds() :: non_neg_integer().
+
-record(enqueuer,
{next_seqno = 1 :: msg_seqno(),
% out of order enqueues - sorted list
@@ -130,6 +132,7 @@
delivery_limit :: option(non_neg_integer()),
max_in_memory_length :: option(non_neg_integer()),
max_in_memory_bytes :: option(non_neg_integer()),
+ expires :: undefined | milliseconds(),
unused_1,
unused_2
}).
@@ -184,6 +187,7 @@
waiting_consumers = [] :: [{consumer_id(), consumer()}],
msg_bytes_in_memory = 0 :: non_neg_integer(),
msgs_ready_in_memory = 0 :: non_neg_integer(),
+ last_active :: undefined | non_neg_integer(),
unused_1,
unused_2
}).
@@ -199,4 +203,7 @@
max_in_memory_bytes => non_neg_integer(),
overflow_strategy => drop_head | reject_publish,
single_active_consumer_on => boolean(),
- delivery_limit => non_neg_integer()}.
+ delivery_limit => non_neg_integer(),
+ expires => non_neg_integer(),
+ created => non_neg_integer()
+ }.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 24f7e4354e..aa7a12fbaa 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -19,7 +19,7 @@
-export([cluster_state/1, status/2]).
-export([update_consumer_handler/8, update_consumer/9]).
-export([cancel_consumer_handler/2, cancel_consumer/3]).
--export([become_leader/2, handle_tick/3]).
+-export([become_leader/2, handle_tick/3, spawn_deleter/1]).
-export([rpc_delete_metrics/1]).
-export([format/1]).
-export([open_files/1]).
@@ -165,6 +165,9 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q),
MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q),
DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q),
+ Expires = args_policy_lookup(<<"expires">>,
+ fun (A, _B) -> A end,
+ Q),
#{name => Name,
queue_resource => QName,
dead_letter_handler => dlx_mfa(Q),
@@ -175,7 +178,9 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
max_in_memory_bytes => MaxMemoryBytes,
single_active_consumer_on => single_active_consumer_on(Q),
delivery_limit => DeliveryLimit,
- overflow_strategy => overflow(Overflow, drop_head)
+ overflow_strategy => overflow(Overflow, drop_head),
+ created => erlang:system_time(millisecond),
+ expires => Expires
}.
single_active_consumer_on(Q) ->
@@ -305,6 +310,7 @@ is_policy_applicable(_Q, Policy) ->
Applicable = [<<"max-length">>,
<<"max-length-bytes">>,
<<"overflow">>,
+ <<"expires">>,
<<"max-in-memory-length">>,
<<"max-in-memory-bytes">>,
<<"delivery-limit">>,
@@ -319,6 +325,12 @@ rpc_delete_metrics(QName) ->
ets:delete(queue_metrics, QName),
ok.
+spawn_deleter(QName) ->
+ spawn(fun () ->
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ delete(Q, false, false, <<"expired">>)
+ end).
+
handle_tick(QName,
{Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack},
Nodes) ->
@@ -1351,7 +1363,7 @@ maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
check_invalid_arguments(QueueName, Args) ->
- Keys = [<<"x-expires">>, <<"x-message-ttl">>,
+ Keys = [<<"x-message-ttl">>,
<<"x-max-priority">>, <<"x-queue-mode">>],
[case rabbit_misc:table_lookup(Args, Key) of
undefined -> ok;
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index ff09a70c98..cc54aae78e 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -130,7 +130,8 @@ all_tests() ->
consumer_metrics,
invalid_policy,
delete_if_empty,
- delete_if_unused
+ delete_if_unused,
+ queue_ttl
].
memory_tests() ->
@@ -340,11 +341,6 @@ declare_invalid_args(Config) ->
{{shutdown, {server_initiated_close, 406, _}}, _},
declare(rabbit_ct_client_helpers:open_channel(Config, Server),
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-expires">>, long, 2000}])),
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-message-ttl">>, long, 2000}])),
?assertExit(
@@ -2462,6 +2458,29 @@ delete_if_unused(Config) ->
amqp_channel:call(Ch, #'queue.delete'{queue = QQ,
if_unused = true})).
+queue_ttl(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-expires">>, long, 1000}])),
+ timer:sleep(5500),
+ %% check queue no longer exists
+ ?assertExit(
+ {{shutdown,
+ {server_initiated_close,404,
+ <<"NOT_FOUND - no queue 'queue_ttl' in vhost '/'">>}},
+ _},
+ amqp_channel:call(Ch, #'queue.declare'{queue = QQ,
+ passive = true,
+ durable = true,
+ auto_delete = false,
+ arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-expires">>, long, 1000}]})),
+ ok.
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index ce2e4d435a..bbd8bd7df8 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -802,7 +802,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
State1 = lists:foldl(AddConsumer, State0, Consumers),
% the channel of the active consumer goes down
- {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, noproc}, State1),
+ {State2, _, Effects} = apply(meta(2), {down, Pid1, noproc}, State1),
% fell back to another consumer
?assertEqual(1, map_size(State2#rabbit_fifo.consumers)),
% there are still waiting consumers
@@ -815,7 +815,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
update_consumer_handler, _}, Effects),
% the channel of the active consumer and a waiting consumer goes down
- {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, noproc}, State2),
+ {State3, _, Effects2} = apply(meta(3), {down, Pid2, noproc}, State2),
% fell back to another consumer
?assertEqual(1, map_size(State3#rabbit_fifo.consumers)),
% no more waiting consumer
@@ -829,7 +829,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
update_consumer_handler, _}, Effects2),
% the last channel goes down
- {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3),
+ {State4, _, Effects3} = apply(meta(4), {down, Pid3, doesnotmatter}, State3),
% no more consumers
?assertEqual(0, map_size(State4#rabbit_fifo.consumers)),
?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)),
@@ -1168,11 +1168,11 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2},
{<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
- {State2, _, Effects2} = apply(#{index => 2}, {down, Pid1, noconnection}, State1),
+ {State2, _, Effects2} = apply(meta(2), {down, Pid1, noconnection}, State1),
% one monitor and one consumer status update (deactivated)
?assertEqual(3, length(Effects2)),
- {_, _, Effects3} = apply(#{index => 3}, {nodeup, node(self())}, State2),
+ {_, _, Effects3} = apply(meta(3), {nodeup, node(self())}, State2),
% for each consumer: 1 effect to monitor the consumer PID
?assertEqual(5, length(Effects3)).
@@ -1403,7 +1403,12 @@ purge_nodes_test(_) ->
ok.
meta(Idx) ->
- #{index => Idx, term => 1,
+ meta(Idx, 0).
+
+meta(Idx, Timestamp) ->
+ #{index => Idx,
+ term => 1,
+ system_time => Timestamp,
from => {make_ref(), self()}}.
enq(Idx, MsgSeq, Msg, State) ->
@@ -1509,6 +1514,110 @@ machine_version_test(_) ->
?assertEqual(1, lqueue:len(Msgs)),
ok.
+queue_ttl_test(_) ->
+ QName = rabbit_misc:r(<<"/">>, queue, <<"test">>),
+ Conf = #{name => ?FUNCTION_NAME,
+ queue_resource => QName,
+ created => 1000,
+ expires => 1000},
+ S0 = rabbit_fifo:init(Conf),
+ Now = 1500,
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S0),
+ %% this should delete the queue
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 1000, S0),
+ %% adding a consumer should not ever trigger deletion
+ Cid = {<<"cid1">>, self()},
+ {S1, _} = check_auto(Cid, 1, S0),
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S1),
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1),
+ %% cancelling the consumer should then
+ {S2, _, _} = apply(meta(2, Now),
+ rabbit_fifo:make_checkout(Cid, cancel, #{}), S1),
+ %% last_active should have been reset when consumer was cancelled
+ %% last_active = 2500
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2),
+ %% but now it should be deleted
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 2500, S2),
+
+ %% Same for downs
+ {S2D, _, _} = apply(meta(2, Now),
+ {down, self(), noconnection}, S1),
+ %% last_active should have been reset when consumer was cancelled
+ %% last_active = 2500
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2D),
+ %% but now it should be deleted
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 2500, S2D),
+
+ %% dequeue should set last applied
+ {S1Deq, {dequeue, empty}} =
+ apply(meta(2, Now),
+ rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}),
+ S0),
+
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1Deq),
+ %% but now it should be deleted
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 2500, S1Deq),
+ %% Enqueue message,
+ {E1, _, _} = apply(meta(2, Now),
+ rabbit_fifo:make_enqueue(self(), 1, msg1), S0),
+ Deq = {<<"deq1">>, self()},
+ {E2, {dequeue, {MsgId, _}, _}, _} =
+ apply(meta(3, Now),
+ rabbit_fifo:make_checkout(Deq, {dequeue, unsettled}, #{}),
+ E1),
+ {E3, _, _} = apply(meta(3, Now + 1000),
+ rabbit_fifo:make_settle(Deq, [MsgId]), E2),
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1500, E3),
+ %% but now it should be deleted
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 3000, E3),
+
+ ok.
+
+queue_ttl_with_single_active_consumer_test(_) ->
+ QName = rabbit_misc:r(<<"/">>, queue, <<"test">>),
+ Conf = #{name => ?FUNCTION_NAME,
+ queue_resource => QName,
+ created => 1000,
+ expires => 1000,
+ single_active_consumer_on => true},
+ S0 = rabbit_fifo:init(Conf),
+ Now = 1500,
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S0),
+ %% this should delete the queue
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 1000, S0),
+ %% adding a consumer should not ever trigger deletion
+ Cid = {<<"cid1">>, self()},
+ {S1, _} = check_auto(Cid, 1, S0),
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S1),
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1),
+ %% cancelling the consumer should then
+ {S2, _, _} = apply(meta(2, Now),
+ rabbit_fifo:make_checkout(Cid, cancel, #{}), S1),
+ %% last_active should have been reset when consumer was cancelled
+ %% last_active = 2500
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2),
+ %% but now it should be deleted
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 2500, S2),
+
+ %% Same for downs
+ {S2D, _, _} = apply(meta(2, Now),
+ {down, self(), noconnection}, S1),
+ %% last_active should have been reset when consumer was cancelled
+ %% last_active = 2500
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2D),
+ %% but now it should be deleted
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 2500, S2D),
+
+ ok.
+
%% Utility
init(Conf) -> rabbit_fifo:init(Conf).
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index dd2c7154d0..859db2178f 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -1081,7 +1081,7 @@ do_apply(Cmd, #t{effects = Effs,
%% down
T;
_ ->
- {St, Effects} = case rabbit_fifo:apply(#{index => Index}, Cmd, S0) of
+ {St, Effects} = case rabbit_fifo:apply(meta(Index), Cmd, S0) of
{S, _, E} when is_list(E) ->
{S, E};
{S, _, E} ->
@@ -1196,7 +1196,7 @@ test_init(Conf) ->
rabbit_fifo:init(maps:merge(Default, Conf)).
meta(Idx) ->
- #{index => Idx, term => 1}.
+ #{index => Idx, term => 1, system_time => 0}.
make_checkout(Cid, Spec) ->
rabbit_fifo:make_checkout(Cid, Spec, #{}).