summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Ansari <david.ansari@gmx.de>2022-01-21 19:32:13 +0100
committerDavid Ansari <david.ansari@gmx.de>2022-01-21 19:35:14 +0100
commit06fd32ff1577652732c49f2b1cfe0e1585cd0537 (patch)
treedeb916fb3760097370251f051b305f8b62ee30a6
parentb5b327069f1415118d1bb8147206a94da30b2088 (diff)
downloadrabbitmq-server-git-quorum-queues-v2.tar.gz
Emit release cursor when expiring messagesquorum-queues-v2
Before this commit, when message TTL was set and messages were only enqueued, a release cursor was never emitted continuously increasing disk usage.
-rw-r--r--deps/rabbit/src/rabbit_fifo.erl86
-rw-r--r--deps/rabbit/test/rabbit_fifo_SUITE.erl15
2 files changed, 59 insertions, 42 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl
index 6904be3ccf..7725576bf5 100644
--- a/deps/rabbit/src/rabbit_fifo.erl
+++ b/deps/rabbit/src/rabbit_fifo.erl
@@ -384,7 +384,7 @@ apply(#{index := Index,
State1 = update_consumer(ConsumerId, ConsumerMeta,
{once, 1, simple_prefetch}, 0,
State0),
- {success, _, MsgId, Msg, State2, Effects0} = checkout_one(Meta, State1, []),
+ {success, _, MsgId, Msg, ExpiredMsg, State2, Effects0} = checkout_one(Meta, false, State1, []),
{State4, Effects1} = case Settlement of
unsettled ->
{_, Pid} = ConsumerId,
@@ -407,11 +407,13 @@ apply(#{index := Index,
end,
NotifyEffect = notify_decorators_effect(State4),
- case evaluate_limit(Index, false, State0, State4, [NotifyEffect | Effects2]) of
- {State, true, Effects} ->
- update_smallest_raft_index(Index, Reply, State, Effects);
- {State, false, Effects} ->
- {State, Reply, Effects}
+ {State, DroppedMsg, Effects} = evaluate_limit(Index, false, State0, State4,
+ [NotifyEffect | Effects2]),
+ case {DroppedMsg, ExpiredMsg} of
+ {false, false} ->
+ {State, Reply, Effects};
+ _ ->
+ update_smallest_raft_index(Index, Reply, State, Effects)
end
end;
apply(#{index := Idx} = Meta,
@@ -461,9 +463,8 @@ apply(#{index := Index}, #purge{},
update_smallest_raft_index(Index, Reply, State, Effects);
apply(#{index := Idx}, #garbage_collection{}, State) ->
update_smallest_raft_index(Idx, ok, State, [{aux, garbage_collection}]);
-apply(#{system_time := Ts} = Meta, {timeout, expire_msgs}, State0) ->
- {State, Effects} = expire_msgs(Ts, State0, []),
- checkout(Meta, State, State, Effects, false);
+apply(Meta, {timeout, expire_msgs}, State) ->
+ checkout(Meta, State, State, [], false);
apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
@@ -1787,58 +1788,59 @@ checkout(#{index := Index} = Meta,
#?MODULE{cfg = #cfg{resource = QName}} = OldState,
State0, Effects0, HandleConsumerChanges) ->
{#?MODULE{cfg = #cfg{dead_letter_handler = DLH},
- dlx = DlxState0} = State1, _Result, Effects1} =
- checkout0(Meta, checkout_one(Meta, State0, Effects0), #{}),
+ dlx = DlxState0} = State1, ExpiredMsg, Effects1} =
+ checkout0(Meta, checkout_one(Meta, false, State0, Effects0), #{}),
{DlxState, DlxDeliveryEffects} = rabbit_fifo_dlx:checkout(DLH, DlxState0),
State2 = State1#?MODULE{dlx = DlxState},
Effects2 = DlxDeliveryEffects ++ Effects1,
- case evaluate_limit(Index, false, OldState, State2, Effects2) of
- {State, true, Effects} ->
+ {State, DroppedMsg, Effects} = evaluate_limit(Index, false, OldState, State2, Effects2),
+ case {DroppedMsg, ExpiredMsg} of
+ {false, false} ->
case maybe_notify_decorators(State, HandleConsumerChanges) of
{true, {MaxActivePriority, IsEmpty}} ->
NotifyEffect = notify_decorators_effect(QName, MaxActivePriority,
IsEmpty),
- update_smallest_raft_index(Index, State, [NotifyEffect | Effects]);
+ {State, ok, [NotifyEffect | Effects]};
false ->
- update_smallest_raft_index(Index, State, Effects)
+ {State, ok, Effects}
end;
- {State, false, Effects} ->
+ _ ->
case maybe_notify_decorators(State, HandleConsumerChanges) of
{true, {MaxActivePriority, IsEmpty}} ->
NotifyEffect = notify_decorators_effect(QName, MaxActivePriority,
IsEmpty),
- {State, ok, [NotifyEffect | Effects]};
+ update_smallest_raft_index(Index, State, [NotifyEffect | Effects]);
false ->
- {State, ok, Effects}
+ update_smallest_raft_index(Index, State, Effects)
end
end.
checkout0(Meta, {success, ConsumerId, MsgId,
- ?INDEX_MSG(RaftIdx, ?DISK_MSG(Header)), State, Effects},
+ ?INDEX_MSG(RaftIdx, ?DISK_MSG(Header)), ExpiredMsg, State, Effects},
SendAcc0) when is_integer(RaftIdx) ->
DelMsg = {RaftIdx, {MsgId, Header}},
SendAcc = maps:update_with(ConsumerId,
fun ({InMem, LogMsgs}) ->
{InMem, [DelMsg | LogMsgs]}
end, {[], [DelMsg]}, SendAcc0),
- checkout0(Meta, checkout_one(Meta, State, Effects), SendAcc);
+ checkout0(Meta, checkout_one(Meta, ExpiredMsg, State, Effects), SendAcc);
checkout0(Meta, {success, ConsumerId, MsgId,
- ?INDEX_MSG(Idx, ?MSG(Header, Msg)), State, Effects},
+ ?INDEX_MSG(Idx, ?MSG(Header, Msg)), ExpiredMsg, State, Effects},
SendAcc0) when is_integer(Idx) ->
DelMsg = {MsgId, {Header, Msg}},
SendAcc = maps:update_with(ConsumerId,
fun ({InMem, LogMsgs}) ->
{[DelMsg | InMem], LogMsgs}
end, {[DelMsg], []}, SendAcc0),
- checkout0(Meta, checkout_one(Meta, State, Effects), SendAcc);
-checkout0(Meta, {success, _ConsumerId, _MsgId, ?TUPLE(_, _), State, Effects},
+ checkout0(Meta, checkout_one(Meta, ExpiredMsg, State, Effects), SendAcc);
+checkout0(Meta, {success, _ConsumerId, _MsgId, ?TUPLE(_, _), ExpiredMsg, State, Effects},
SendAcc) ->
%% Do not append delivery effect for prefix messages.
%% Prefix messages do not exist anymore, but they still go through the
%% normal checkout flow to derive correct consumer states
%% after recovery and will still be settled or discarded later on.
- checkout0(Meta, checkout_one(Meta, State, Effects), SendAcc);
-checkout0(_Meta, {Activity, State0, Effects0}, SendAcc) ->
+ checkout0(Meta, checkout_one(Meta, ExpiredMsg, State, Effects), SendAcc);
+checkout0(_Meta, {Activity, ExpiredMsg, State0, Effects0}, SendAcc) ->
Effects1 = case Activity of
nochange ->
append_delivery_effects(Effects0, SendAcc);
@@ -1846,7 +1848,7 @@ checkout0(_Meta, {Activity, State0, Effects0}, SendAcc) ->
[{aux, inactive}
| append_delivery_effects(Effects0, SendAcc)]
end,
- {State0, ok, lists:reverse(Effects1)}.
+ {State0, ExpiredMsg, lists:reverse(Effects1)}.
evaluate_limit(_Index, Result, _BeforeState,
#?MODULE{cfg = #cfg{max_length = undefined,
@@ -2014,13 +2016,13 @@ reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
{dequeue, {MsgId, {Header, Msg}}, Ready}}}]
end}.
-checkout_one(#{system_time := Ts} = Meta, InitState0, Effects0) ->
+checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) ->
%% Before checking out any messsage to any consumer,
%% first remove all expired messages from the head of the queue.
- {#?MODULE{service_queue = SQ0,
- messages = Messages0,
- consumers = Cons0} = InitState, Effects1} =
- expire_msgs(Ts, InitState0, Effects0),
+ {ExpiredMsg, #?MODULE{service_queue = SQ0,
+ messages = Messages0,
+ consumers = Cons0} = InitState, Effects1} =
+ expire_msgs(Ts, ExpiredMsg0, InitState0, Effects0),
case priority_queue:out(SQ0) of
{{value, ConsumerId}, SQ1}
@@ -2037,11 +2039,11 @@ checkout_one(#{system_time := Ts} = Meta, InitState0, Effects0) ->
%% NB: these retry cases introduce the "queue list reversal"
%% inefficiency but this is a rare thing to happen
%% so should not need optimising
- checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}, Effects1);
+ checkout_one(Meta, ExpiredMsg, InitState#?MODULE{service_queue = SQ1}, Effects1);
#consumer{status = cancelled} ->
- checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}, Effects1);
+ checkout_one(Meta, ExpiredMsg, InitState#?MODULE{service_queue = SQ1}, Effects1);
#consumer{status = suspected_down} ->
- checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}, Effects1);
+ checkout_one(Meta, ExpiredMsg, InitState#?MODULE{service_queue = SQ1}, Effects1);
#consumer{checked_out = Checked0,
next_msg_id = Next,
credit = Credit,
@@ -2064,26 +2066,26 @@ checkout_one(#{system_time := Ts} = Meta, InitState0, Effects0) ->
subtract_in_memory_counts(
Header, add_bytes_checkout(Header, State1))
end,
- {success, ConsumerId, Next, ConsumerMsg, State, Effects1}
+ {success, ConsumerId, Next, ConsumerMsg, ExpiredMsg, State, Effects1}
end;
empty ->
- {nochange, InitState, Effects1}
+ {nochange, ExpiredMsg, InitState, Effects1}
end;
{{value, _ConsumerId}, SQ1} ->
%% consumer did not exist but was queued, recurse
- checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}, Effects1);
+ checkout_one(Meta, ExpiredMsg, InitState#?MODULE{service_queue = SQ1}, Effects1);
{empty, _} ->
% Effects = timer_effect(Ts, InitState, Effects1),
case lqueue:len(Messages0) of
0 ->
- {nochange, InitState, Effects1};
+ {nochange, ExpiredMsg, InitState, Effects1};
_ ->
- {inactive, InitState, Effects1}
+ {inactive, ExpiredMsg, InitState, Effects1}
end
end.
%% dequeue all expired messages
-expire_msgs(RaCmdTs, State, Effects) ->
+expire_msgs(RaCmdTs, Result, State, Effects) ->
%% In the normal case, there are no expired messages.
%% Therefore, first queue:peek/1 to check whether we need to queue:out/1
%% because the latter can be much slower than the former.
@@ -2098,7 +2100,7 @@ expire_msgs(RaCmdTs, State, Effects) ->
when RaCmdTs >= Expiry ->
expire(RaCmdTs, Header, State, Effects);
_ ->
- {State, Effects}
+ {Result, State, Effects}
end.
expire(RaCmdTs, Header, State0, Effects) ->
@@ -2124,7 +2126,7 @@ expire(RaCmdTs, Header, State0, Effects) ->
State3
end,
State = decr_total(State5),
- expire_msgs(RaCmdTs, State, DlxEffects ++ Effects).
+ expire_msgs(RaCmdTs, true, State, DlxEffects ++ Effects).
timer_effect(RaCmdTs, State, Effects) ->
T = case peek_next_msg(State) of
diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl
index 4d16bec338..481dcb2151 100644
--- a/deps/rabbit/test/rabbit_fifo_SUITE.erl
+++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl
@@ -1737,6 +1737,21 @@ empty_dequeue_should_emit_release_cursor_test(_) ->
?ASSERT_EFF({release_cursor, _, _}, Effects),
ok.
+expire_message_should_emit_release_cursor_test(_) ->
+ Conf = #{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
+ release_cursor_interval => 0,
+ msg_ttl => 1},
+ S0 = rabbit_fifo:init(Conf),
+ Msg = #basic_message{content = #content{properties = none,
+ payload_fragments_rev = []}},
+ {S1, ok, _} = apply(meta(1, 100), rabbit_fifo:make_enqueue(self(), 1, Msg), S0),
+ {_S, ok, Effs} = apply(meta(2, 101),
+ rabbit_fifo:make_enqueue(self(), 2, Msg),
+ S1),
+ ?ASSERT_EFF({release_cursor, 1, _}, Effs),
+ ok.
+
%% Utility
init(Conf) -> rabbit_fifo:init(Conf).