diff options
author | David Ansari <david.ansari@gmx.de> | 2022-01-21 19:32:13 +0100 |
---|---|---|
committer | David Ansari <david.ansari@gmx.de> | 2022-01-21 19:35:14 +0100 |
commit | 06fd32ff1577652732c49f2b1cfe0e1585cd0537 (patch) | |
tree | deb916fb3760097370251f051b305f8b62ee30a6 | |
parent | b5b327069f1415118d1bb8147206a94da30b2088 (diff) | |
download | rabbitmq-server-git-quorum-queues-v2.tar.gz |
Emit release cursor when expiring messagesquorum-queues-v2
Before this commit, when message TTL was set and
messages were only enqueued, a release cursor was never emitted
continuously increasing disk usage.
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.erl | 86 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_fifo_SUITE.erl | 15 |
2 files changed, 59 insertions, 42 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 6904be3ccf..7725576bf5 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -384,7 +384,7 @@ apply(#{index := Index, State1 = update_consumer(ConsumerId, ConsumerMeta, {once, 1, simple_prefetch}, 0, State0), - {success, _, MsgId, Msg, State2, Effects0} = checkout_one(Meta, State1, []), + {success, _, MsgId, Msg, ExpiredMsg, State2, Effects0} = checkout_one(Meta, false, State1, []), {State4, Effects1} = case Settlement of unsettled -> {_, Pid} = ConsumerId, @@ -407,11 +407,13 @@ apply(#{index := Index, end, NotifyEffect = notify_decorators_effect(State4), - case evaluate_limit(Index, false, State0, State4, [NotifyEffect | Effects2]) of - {State, true, Effects} -> - update_smallest_raft_index(Index, Reply, State, Effects); - {State, false, Effects} -> - {State, Reply, Effects} + {State, DroppedMsg, Effects} = evaluate_limit(Index, false, State0, State4, + [NotifyEffect | Effects2]), + case {DroppedMsg, ExpiredMsg} of + {false, false} -> + {State, Reply, Effects}; + _ -> + update_smallest_raft_index(Index, Reply, State, Effects) end end; apply(#{index := Idx} = Meta, @@ -461,9 +463,8 @@ apply(#{index := Index}, #purge{}, update_smallest_raft_index(Index, Reply, State, Effects); apply(#{index := Idx}, #garbage_collection{}, State) -> update_smallest_raft_index(Idx, ok, State, [{aux, garbage_collection}]); -apply(#{system_time := Ts} = Meta, {timeout, expire_msgs}, State0) -> - {State, Effects} = expire_msgs(Ts, State0, []), - checkout(Meta, State, State, Effects, false); +apply(Meta, {timeout, expire_msgs}, State) -> + checkout(Meta, State, State, [], false); apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, @@ -1787,58 +1788,59 @@ checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = QName}} = OldState, State0, Effects0, HandleConsumerChanges) -> {#?MODULE{cfg = #cfg{dead_letter_handler = DLH}, - dlx = DlxState0} = State1, _Result, Effects1} = - checkout0(Meta, checkout_one(Meta, State0, Effects0), #{}), + dlx = DlxState0} = State1, ExpiredMsg, Effects1} = + checkout0(Meta, checkout_one(Meta, false, State0, Effects0), #{}), {DlxState, DlxDeliveryEffects} = rabbit_fifo_dlx:checkout(DLH, DlxState0), State2 = State1#?MODULE{dlx = DlxState}, Effects2 = DlxDeliveryEffects ++ Effects1, - case evaluate_limit(Index, false, OldState, State2, Effects2) of - {State, true, Effects} -> + {State, DroppedMsg, Effects} = evaluate_limit(Index, false, OldState, State2, Effects2), + case {DroppedMsg, ExpiredMsg} of + {false, false} -> case maybe_notify_decorators(State, HandleConsumerChanges) of {true, {MaxActivePriority, IsEmpty}} -> NotifyEffect = notify_decorators_effect(QName, MaxActivePriority, IsEmpty), - update_smallest_raft_index(Index, State, [NotifyEffect | Effects]); + {State, ok, [NotifyEffect | Effects]}; false -> - update_smallest_raft_index(Index, State, Effects) + {State, ok, Effects} end; - {State, false, Effects} -> + _ -> case maybe_notify_decorators(State, HandleConsumerChanges) of {true, {MaxActivePriority, IsEmpty}} -> NotifyEffect = notify_decorators_effect(QName, MaxActivePriority, IsEmpty), - {State, ok, [NotifyEffect | Effects]}; + update_smallest_raft_index(Index, State, [NotifyEffect | Effects]); false -> - {State, ok, Effects} + update_smallest_raft_index(Index, State, Effects) end end. checkout0(Meta, {success, ConsumerId, MsgId, - ?INDEX_MSG(RaftIdx, ?DISK_MSG(Header)), State, Effects}, + ?INDEX_MSG(RaftIdx, ?DISK_MSG(Header)), ExpiredMsg, State, Effects}, SendAcc0) when is_integer(RaftIdx) -> DelMsg = {RaftIdx, {MsgId, Header}}, SendAcc = maps:update_with(ConsumerId, fun ({InMem, LogMsgs}) -> {InMem, [DelMsg | LogMsgs]} end, {[], [DelMsg]}, SendAcc0), - checkout0(Meta, checkout_one(Meta, State, Effects), SendAcc); + checkout0(Meta, checkout_one(Meta, ExpiredMsg, State, Effects), SendAcc); checkout0(Meta, {success, ConsumerId, MsgId, - ?INDEX_MSG(Idx, ?MSG(Header, Msg)), State, Effects}, + ?INDEX_MSG(Idx, ?MSG(Header, Msg)), ExpiredMsg, State, Effects}, SendAcc0) when is_integer(Idx) -> DelMsg = {MsgId, {Header, Msg}}, SendAcc = maps:update_with(ConsumerId, fun ({InMem, LogMsgs}) -> {[DelMsg | InMem], LogMsgs} end, {[DelMsg], []}, SendAcc0), - checkout0(Meta, checkout_one(Meta, State, Effects), SendAcc); -checkout0(Meta, {success, _ConsumerId, _MsgId, ?TUPLE(_, _), State, Effects}, + checkout0(Meta, checkout_one(Meta, ExpiredMsg, State, Effects), SendAcc); +checkout0(Meta, {success, _ConsumerId, _MsgId, ?TUPLE(_, _), ExpiredMsg, State, Effects}, SendAcc) -> %% Do not append delivery effect for prefix messages. %% Prefix messages do not exist anymore, but they still go through the %% normal checkout flow to derive correct consumer states %% after recovery and will still be settled or discarded later on. - checkout0(Meta, checkout_one(Meta, State, Effects), SendAcc); -checkout0(_Meta, {Activity, State0, Effects0}, SendAcc) -> + checkout0(Meta, checkout_one(Meta, ExpiredMsg, State, Effects), SendAcc); +checkout0(_Meta, {Activity, ExpiredMsg, State0, Effects0}, SendAcc) -> Effects1 = case Activity of nochange -> append_delivery_effects(Effects0, SendAcc); @@ -1846,7 +1848,7 @@ checkout0(_Meta, {Activity, State0, Effects0}, SendAcc) -> [{aux, inactive} | append_delivery_effects(Effects0, SendAcc)] end, - {State0, ok, lists:reverse(Effects1)}. + {State0, ExpiredMsg, lists:reverse(Effects1)}. evaluate_limit(_Index, Result, _BeforeState, #?MODULE{cfg = #cfg{max_length = undefined, @@ -2014,13 +2016,13 @@ reply_log_effect(RaftIdx, MsgId, Header, Ready, From) -> {dequeue, {MsgId, {Header, Msg}}, Ready}}}] end}. -checkout_one(#{system_time := Ts} = Meta, InitState0, Effects0) -> +checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) -> %% Before checking out any messsage to any consumer, %% first remove all expired messages from the head of the queue. - {#?MODULE{service_queue = SQ0, - messages = Messages0, - consumers = Cons0} = InitState, Effects1} = - expire_msgs(Ts, InitState0, Effects0), + {ExpiredMsg, #?MODULE{service_queue = SQ0, + messages = Messages0, + consumers = Cons0} = InitState, Effects1} = + expire_msgs(Ts, ExpiredMsg0, InitState0, Effects0), case priority_queue:out(SQ0) of {{value, ConsumerId}, SQ1} @@ -2037,11 +2039,11 @@ checkout_one(#{system_time := Ts} = Meta, InitState0, Effects0) -> %% NB: these retry cases introduce the "queue list reversal" %% inefficiency but this is a rare thing to happen %% so should not need optimising - checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}, Effects1); + checkout_one(Meta, ExpiredMsg, InitState#?MODULE{service_queue = SQ1}, Effects1); #consumer{status = cancelled} -> - checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}, Effects1); + checkout_one(Meta, ExpiredMsg, InitState#?MODULE{service_queue = SQ1}, Effects1); #consumer{status = suspected_down} -> - checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}, Effects1); + checkout_one(Meta, ExpiredMsg, InitState#?MODULE{service_queue = SQ1}, Effects1); #consumer{checked_out = Checked0, next_msg_id = Next, credit = Credit, @@ -2064,26 +2066,26 @@ checkout_one(#{system_time := Ts} = Meta, InitState0, Effects0) -> subtract_in_memory_counts( Header, add_bytes_checkout(Header, State1)) end, - {success, ConsumerId, Next, ConsumerMsg, State, Effects1} + {success, ConsumerId, Next, ConsumerMsg, ExpiredMsg, State, Effects1} end; empty -> - {nochange, InitState, Effects1} + {nochange, ExpiredMsg, InitState, Effects1} end; {{value, _ConsumerId}, SQ1} -> %% consumer did not exist but was queued, recurse - checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}, Effects1); + checkout_one(Meta, ExpiredMsg, InitState#?MODULE{service_queue = SQ1}, Effects1); {empty, _} -> % Effects = timer_effect(Ts, InitState, Effects1), case lqueue:len(Messages0) of 0 -> - {nochange, InitState, Effects1}; + {nochange, ExpiredMsg, InitState, Effects1}; _ -> - {inactive, InitState, Effects1} + {inactive, ExpiredMsg, InitState, Effects1} end end. %% dequeue all expired messages -expire_msgs(RaCmdTs, State, Effects) -> +expire_msgs(RaCmdTs, Result, State, Effects) -> %% In the normal case, there are no expired messages. %% Therefore, first queue:peek/1 to check whether we need to queue:out/1 %% because the latter can be much slower than the former. @@ -2098,7 +2100,7 @@ expire_msgs(RaCmdTs, State, Effects) -> when RaCmdTs >= Expiry -> expire(RaCmdTs, Header, State, Effects); _ -> - {State, Effects} + {Result, State, Effects} end. expire(RaCmdTs, Header, State0, Effects) -> @@ -2124,7 +2126,7 @@ expire(RaCmdTs, Header, State0, Effects) -> State3 end, State = decr_total(State5), - expire_msgs(RaCmdTs, State, DlxEffects ++ Effects). + expire_msgs(RaCmdTs, true, State, DlxEffects ++ Effects). timer_effect(RaCmdTs, State, Effects) -> T = case peek_next_msg(State) of diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 4d16bec338..481dcb2151 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -1737,6 +1737,21 @@ empty_dequeue_should_emit_release_cursor_test(_) -> ?ASSERT_EFF({release_cursor, _, _}, Effects), ok. +expire_message_should_emit_release_cursor_test(_) -> + Conf = #{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), + release_cursor_interval => 0, + msg_ttl => 1}, + S0 = rabbit_fifo:init(Conf), + Msg = #basic_message{content = #content{properties = none, + payload_fragments_rev = []}}, + {S1, ok, _} = apply(meta(1, 100), rabbit_fifo:make_enqueue(self(), 1, Msg), S0), + {_S, ok, Effs} = apply(meta(2, 101), + rabbit_fifo:make_enqueue(self(), 2, Msg), + S1), + ?ASSERT_EFF({release_cursor, 1, _}, Effs), + ok. + %% Utility init(Conf) -> rabbit_fifo:init(Conf). |