diff options
author | kjnilsson <knilsson@pivotal.io> | 2020-08-21 16:35:02 +0100 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2020-09-01 16:36:47 +0100 |
commit | b6f60ff90bbcabf591b7189c90c818b7c11a36f6 (patch) | |
tree | 61720044b575cb452fadeae3082ac1fef180083e | |
parent | 7093e4efc3a2791c47ded7975b66bf7e108c1862 (diff) | |
download | rabbitmq-server-git-qq-queue-ttl.tar.gz |
Quorum queue 'Queue TTL' support.qq-queue-ttl
The quorum queue will track the timestamp of whenever a consumer was removed and
periodically check if the queue should be deleted based on it's expiry
configuration.
-rw-r--r-- | src/rabbit_fifo.erl | 287 | ||||
-rw-r--r-- | src/rabbit_fifo.hrl | 9 | ||||
-rw-r--r-- | src/rabbit_quorum_queue.erl | 18 | ||||
-rw-r--r-- | test/quorum_queue_SUITE.erl | 31 | ||||
-rw-r--r-- | test/rabbit_fifo_SUITE.erl | 121 | ||||
-rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 4 |
6 files changed, 318 insertions, 152 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index aea2850084..b59f4b04f8 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -134,6 +134,7 @@ update_config(Conf, State) -> MaxMemoryLength = maps:get(max_in_memory_length, Conf, undefined), MaxMemoryBytes = maps:get(max_in_memory_bytes, Conf, undefined), DeliveryLimit = maps:get(delivery_limit, Conf, undefined), + Expires = maps:get(expires, Conf, undefined), ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of true -> single_active; @@ -148,6 +149,7 @@ update_config(Conf, State) -> {RCI, C} end, + LastActive = maps:get(created, Conf, undefined), State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCISpec, dead_letter_handler = DLH, become_leader_handler = BLH, @@ -157,7 +159,9 @@ update_config(Conf, State) -> max_in_memory_length = MaxMemoryLength, max_in_memory_bytes = MaxMemoryBytes, consumer_strategy = ConsumerStrategy, - delivery_limit = DeliveryLimit}}. + delivery_limit = DeliveryLimit, + expires = Expires}, + last_active = LastActive}. zero(_) -> 0. @@ -285,10 +289,14 @@ apply(_, #checkout{spec = {dequeue, _}}, #?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) -> {State0, {error, unsupported}}; apply(#{index := Index, + system_time := Ts, from := From} = Meta, #checkout{spec = {dequeue, Settlement}, meta = ConsumerMeta, consumer_id = ConsumerId}, - #?MODULE{consumers = Consumers} = State0) -> + #?MODULE{consumers = Consumers} = State00) -> + %% dequeue always updates last_active + State0 = State00#?MODULE{last_active = Ts}, + %% all dequeue operations result in keeping the queue from expiring Exists = maps:is_key(ConsumerId, Consumers), case messages_ready(State0) of 0 -> @@ -300,7 +308,7 @@ apply(#{index := Index, State1 = update_consumer(ConsumerId, ConsumerMeta, {once, 1, simple_prefetch}, State0), - {success, _, MsgId, Msg, State2} = checkout_one(State1), + {success, _, MsgId, Msg, State2} = checkout_one(Meta, State1), {State4, Effects1} = case Settlement of unsettled -> {_, Pid} = ConsumerId, @@ -332,7 +340,8 @@ apply(#{index := Index, end end; apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> - {State, Effects} = cancel_consumer(ConsumerId, State0, [], consumer_cancel), + {State, Effects} = cancel_consumer(Meta, ConsumerId, State0, [], + consumer_cancel), checkout(Meta, State0, State, Effects); apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, @@ -364,7 +373,7 @@ apply(#{index := Index}, #purge{}, {State, false, Effects} -> {State, Reply, Effects} end; -apply(Meta, {down, Pid, noconnection}, +apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = Waiting0, @@ -381,7 +390,7 @@ apply(Meta, {down, Pid, noconnection}, S0, Cid, C0, false, suspected_down, E0), Checked = C0#consumer.checked_out, Credit = increase_credit(C0, maps:size(Checked)), - {St, Effs1} = return_all(S0, Effs, + {St, Effs1} = return_all(Meta, S0, Effs, Cid, C0#consumer{credit = Credit}), %% if the consumer was cancelled there is a chance it got %% removed when returning hence we need to be defensive here @@ -392,7 +401,8 @@ apply(Meta, {down, Pid, noconnection}, Waiting0 end, {St#?MODULE{consumers = maps:remove(Cid, St#?MODULE.consumers), - waiting_consumers = Waiting}, + waiting_consumers = Waiting, + last_active = Ts}, Effs1}; (_, _, S) -> S @@ -411,7 +421,7 @@ apply(Meta, {down, Pid, noconnection}, end, Enqs0), Effects = [{monitor, node, Node} | Effects1], checkout(Meta, State0, State#?MODULE{enqueuers = Enqs}, Effects); -apply(Meta, {down, Pid, noconnection}, +apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> %% A node has been disconnected. This doesn't necessarily mean that @@ -432,7 +442,7 @@ apply(Meta, {down, Pid, noconnection}, Credit = increase_credit(C0, map_size(Checked0)), C = C0#consumer{status = suspected_down, credit = Credit}, - {St, Eff0} = return_all(St0, Eff, Cid, C), + {St, Eff0} = return_all(Meta, St0, Eff, Cid, C), Eff1 = consumer_update_active_effects(St, Cid, C, false, suspected_down, Eff0), {St, Eff1}; @@ -453,13 +463,14 @@ apply(Meta, {down, Pid, noconnection}, _ -> [{monitor, node, Node}] end ++ Effects1, - checkout(Meta, State0, State#?MODULE{enqueuers = Enqs}, Effects); + checkout(Meta, State0, State#?MODULE{enqueuers = Enqs, + last_active = Ts}, Effects); apply(Meta, {down, Pid, _Info}, State0) -> - {State, Effects} = handle_down(Pid, State0), + {State, Effects} = handle_down(Meta, Pid, State0), checkout(Meta, State0, State, Effects); apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, enqueuers = Enqs0, - service_queue = SQ0} = State0) -> + service_queue = _SQ0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're @@ -473,30 +484,29 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, end, Enqs0), ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), %% mark all consumers as up - {Cons1, SQ, Effects1} = - maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) + {State1, Effects1} = + maps:fold(fun({_, P} = ConsumerId, C, {SAcc, EAcc}) when (node(P) =:= Node) and (C#consumer.status =/= cancelled) -> - EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, + EAcc1 = ConsumerUpdateActiveFun(SAcc, ConsumerId, C, true, up, EAcc), - update_or_remove_sub(ConsumerId, - C#consumer{status = up}, CAcc, - SQAcc, EAcc1); + {update_or_remove_sub(Meta, ConsumerId, + C#consumer{status = up}, + SAcc), EAcc1}; (_, _, Acc) -> Acc - end, {Cons0, SQ0, Monitors}, Cons0), - Waiting = update_waiting_consumer_status(Node, State0, up), - State1 = State0#?MODULE{consumers = Cons1, + end, {State0, Monitors}, Cons0), + Waiting = update_waiting_consumer_status(Node, State1, up), + State2 = State1#?MODULE{ enqueuers = Enqs1, - service_queue = SQ, waiting_consumers = Waiting}, - {State, Effects} = activate_next_consumer(State1, Effects1), + {State, Effects} = activate_next_consumer(State2, Effects1), checkout(Meta, State0, State, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; -apply(_, #purge_nodes{nodes = Nodes}, State0) -> +apply(Meta, #purge_nodes{nodes = Nodes}, State0) -> {State, Effects} = lists:foldl(fun(Node, {S, E}) -> - purge_node(Node, S, E) + purge_node(Meta, Node, S, E) end, {State0, []}, Nodes), {State, ok, Effects}; apply(Meta, #update_config{config = Conf}, State) -> @@ -549,15 +559,15 @@ convert_v0_to_v1(V0State0) -> msgs_ready_in_memory = rabbit_fifo_v0:get_field(msgs_ready_in_memory, V0State) }. -purge_node(Node, State, Effects) -> +purge_node(Meta, Node, State, Effects) -> lists:foldl(fun(Pid, {S0, E0}) -> - {S, E} = handle_down(Pid, S0), + {S, E} = handle_down(Meta, Pid, S0), {S, E0 ++ E} end, {State, Effects}, all_pids_for(Node, State)). %% any downs that re not noconnection -handle_down(Pid, #?MODULE{consumers = Cons0, - enqueuers = Enqs0} = State0) -> +handle_down(Meta, Pid, #?MODULE{consumers = Cons0, + enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages % This should be ok as we won't see any more enqueues from this pid State1 = case maps:take(Pid, Enqs0) of @@ -574,7 +584,7 @@ handle_down(Pid, #?MODULE{consumers = Cons0, DownConsumers = maps:keys( maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), lists:foldl(fun(ConsumerId, {S, E}) -> - cancel_consumer(ConsumerId, S, E, down) + cancel_consumer(Meta, ConsumerId, S, E, down) end, {State2, Effects1}, DownConsumers). consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) -> @@ -666,19 +676,24 @@ state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= le -spec tick(non_neg_integer(), state()) -> ra_machine:effects(). -tick(_Ts, #?MODULE{cfg = #cfg{name = Name, - resource = QName}, +tick(Ts, #?MODULE{cfg = #cfg{name = Name, + resource = QName}, msg_bytes_enqueue = EnqueueBytes, msg_bytes_checkout = CheckoutBytes} = State) -> - Metrics = {Name, - messages_ready(State), - num_checked_out(State), % checked out - messages_total(State), - query_consumer_count(State), % Consumers - EnqueueBytes, - CheckoutBytes}, - [{mod_call, rabbit_quorum_queue, - handle_tick, [QName, Metrics, all_nodes(State)]}]. + case is_expired(Ts, State) of + true -> + [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]; + false -> + Metrics = {Name, + messages_ready(State), + num_checked_out(State), % checked out + messages_total(State), + query_consumer_count(State), % Consumers + EnqueueBytes, + CheckoutBytes}, + [{mod_call, rabbit_quorum_queue, + handle_tick, [QName, Metrics, all_nodes(State)]}] + end. -spec overview(state()) -> map(). overview(#?MODULE{consumers = Cons, @@ -933,17 +948,17 @@ num_checked_out(#?MODULE{consumers = Cons}) -> maps:size(C) + Acc end, 0, Cons). -cancel_consumer(ConsumerId, +cancel_consumer(Meta, ConsumerId, #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State, Effects, Reason) -> - cancel_consumer0(ConsumerId, State, Effects, Reason); -cancel_consumer(ConsumerId, + cancel_consumer0(Meta, ConsumerId, State, Effects, Reason); +cancel_consumer(Meta, ConsumerId, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = []} = State, Effects, Reason) -> %% single active consumer on, no consumers are waiting - cancel_consumer0(ConsumerId, State, Effects, Reason); -cancel_consumer(ConsumerId, + cancel_consumer0(Meta, ConsumerId, State, Effects, Reason); +cancel_consumer(Meta, ConsumerId, #?MODULE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = Waiting0} = State0, @@ -952,7 +967,7 @@ cancel_consumer(ConsumerId, case maps:is_key(ConsumerId, Cons0) of true -> % The active consumer is to be removed - {State1, Effects1} = cancel_consumer0(ConsumerId, State0, + {State1, Effects1} = cancel_consumer0(Meta, ConsumerId, State0, Effects0, Reason), activate_next_consumer(State1, Effects1); false -> @@ -976,11 +991,12 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]} | Effects]. -cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> +cancel_consumer0(Meta, ConsumerId, + #?MODULE{consumers = C0} = S0, Effects0, Reason) -> case C0 of #{ConsumerId := Consumer} -> - {S, Effects2} = maybe_return_all(ConsumerId, Consumer, S0, - Effects0, Reason), + {S, Effects2} = maybe_return_all(Meta, ConsumerId, Consumer, + S0, Effects0, Reason), %% The effects are emitted before the consumer is actually removed %% if the consumer has unacked messages. This is a bit weird but %% in line with what classic queues do (from an external point of @@ -1029,23 +1045,18 @@ activate_next_consumer(#?MODULE{consumers = Cons, -maybe_return_all(ConsumerId, Consumer, - #?MODULE{consumers = C0, - service_queue = SQ0} = S0, - Effects0, Reason) -> +maybe_return_all(#{system_time := Ts} = Meta, ConsumerId, Consumer, S0, Effects0, Reason) -> case Reason of consumer_cancel -> - {Cons, SQ, Effects1} = - update_or_remove_sub(ConsumerId, - Consumer#consumer{lifetime = once, - credit = 0, - status = cancelled}, - C0, SQ0, Effects0), - {S0#?MODULE{consumers = Cons, - service_queue = SQ}, Effects1}; + {update_or_remove_sub(Meta, ConsumerId, + Consumer#consumer{lifetime = once, + credit = 0, + status = cancelled}, + S0), Effects0}; down -> - {S1, Effects1} = return_all(S0, Effects0, ConsumerId, Consumer), - {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers)}, + {S1, Effects1} = return_all(Meta, S0, Effects0, ConsumerId, Consumer), + {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers), + last_active = Ts}, Effects1} end. @@ -1139,7 +1150,7 @@ maybe_store_dehydrated_state(RaftIdx, ?RELEASE_CURSOR_EVERY_MAX) end, State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = - {Base, Interval}}}, + {Base, Interval}}}, Dehydrated = dehydrate_state(State), Cursor = {release_cursor, RaftIdx, Dehydrated}, Cursors = lqueue:in(Cursor, Cursors0), @@ -1193,57 +1204,50 @@ snd(T) -> element(2, T). return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned, - Effects0, #?MODULE{service_queue = SQ0} = State0) -> + Effects0, State0) -> {State1, Effects1} = maps:fold( fun(MsgId, {Tag, _} = Msg, {S0, E0}) when Tag == '$prefix_msg'; Tag == '$empty_msg'-> - return_one(MsgId, 0, Msg, S0, E0, ConsumerId); + return_one(Meta, MsgId, 0, Msg, S0, E0, ConsumerId); (MsgId, {MsgNum, Msg}, {S0, E0}) -> - return_one(MsgId, MsgNum, Msg, S0, E0, + return_one(Meta, MsgId, MsgNum, Msg, S0, E0, ConsumerId) end, {State0, Effects0}, Returned), - {State2, Effects3} = + State2 = case State1#?MODULE.consumers of - #{ConsumerId := Con0} = Cons0 -> + #{ConsumerId := Con0} -> Con = Con0#consumer{credit = increase_credit(Con0, map_size(Returned))}, - {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con, - Cons0, SQ0, Effects1), - {State1#?MODULE{consumers = Cons, - service_queue = SQ}, Effects2}; + update_or_remove_sub(Meta, ConsumerId, Con, State1); _ -> - {State1, Effects1} + State1 end, - {State, ok, Effects} = checkout(Meta, State0, State2, Effects3), + {State, ok, Effects} = checkout(Meta, State0, State2, Effects1), update_smallest_raft_index(IncomingRaftIdx, State, Effects). % used to processes messages that are finished -complete(ConsumerId, Discarded, - #consumer{checked_out = Checked} = Con0, Effects0, - #?MODULE{consumers = Cons0, service_queue = SQ0, - ra_indexes = Indexes0} = State0) -> +complete(Meta, ConsumerId, Discarded, + #consumer{checked_out = Checked} = Con0, Effects, + #?MODULE{ra_indexes = Indexes0} = State0) -> %% TODO optimise use of Discarded map here MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], %% credit_mode = simple_prefetch should automatically top-up credit %% as messages are simple_prefetch or otherwise returned Con = Con0#consumer{checked_out = maps:without(maps:keys(Discarded), Checked), credit = increase_credit(Con0, map_size(Discarded))}, - {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, - SQ0, Effects0), + State1 = update_or_remove_sub(Meta, ConsumerId, Con, State0), Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs), %% TODO: use maps:fold instead - State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) -> + State2 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) -> add_bytes_settle(Header, Acc); ({'$prefix_msg', Header}, Acc) -> add_bytes_settle(Header, Acc); ({'$empty_msg', Header}, Acc) -> add_bytes_settle(Header, Acc) - end, State0, maps:values(Discarded)), - {State1#?MODULE{consumers = Cons, - ra_indexes = Indexes, - service_queue = SQ}, Effects}. + end, State1, maps:values(Discarded)), + {State2#?MODULE{ra_indexes = Indexes}, Effects}. increase_credit(#consumer{lifetime = once, credit = Credit}, _) -> @@ -1261,7 +1265,7 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, #consumer{checked_out = Checked0} = Con0, Effects0, State0) -> Discarded = maps:with(MsgIds, Checked0), - {State2, Effects1} = complete(ConsumerId, Discarded, Con0, + {State2, Effects1} = complete(Meta, ConsumerId, Discarded, Con0, Effects0, State0), {State, ok, Effects} = checkout(Meta, State0, State2, Effects1), update_smallest_raft_index(IncomingRaftIdx, State, Effects). @@ -1345,7 +1349,7 @@ update_header(Key, UpdateFun, Default, Header) -> maps:update_with(Key, UpdateFun, Default, Header). -return_one(MsgId, 0, {Tag, Header0}, +return_one(Meta, MsgId, 0, {Tag, Header0}, #?MODULE{returns = Returns, consumers = Consumers, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, @@ -1356,7 +1360,7 @@ return_one(MsgId, 0, {Tag, Header0}, Msg0 = {Tag, Header}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> - complete(ConsumerId, #{MsgId => Msg0}, Con0, Effects0, State0); + complete(Meta, ConsumerId, #{MsgId => Msg0}, Con0, Effects0, State0); _ -> %% this should not affect the release cursor in any way Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)}, @@ -1376,7 +1380,7 @@ return_one(MsgId, 0, {Tag, Header0}, returns = lqueue:in(Msg, Returns)}), Effects0} end; -return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, +return_one(Meta, MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, #?MODULE{returns = Returns, consumers = Consumers, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, @@ -1389,7 +1393,7 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, DlMsg = {MsgNum, Msg0}, Effects = dead_letter_effects(delivery_limit, #{none => DlMsg}, State0, Effects0), - complete(ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0); + complete(Meta, ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0); _ -> Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)}, %% this should not affect the release cursor in any way @@ -1411,23 +1415,23 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, Effects0} end. -return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, +return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, #consumer{checked_out = Checked0} = Con) -> %% need to sort the list so that we return messages in the order %% they were checked out Checked = lists:sort(maps:to_list(Checked0)), State = State0#?MODULE{consumers = Cons#{ConsumerId => Con}}, lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) -> - return_one(MsgId, 0, Msg, S, E, ConsumerId); + return_one(Meta, MsgId, 0, Msg, S, E, ConsumerId); ({MsgId, {'$empty_msg', _} = Msg}, {S, E}) -> - return_one(MsgId, 0, Msg, S, E, ConsumerId); + return_one(Meta, MsgId, 0, Msg, S, E, ConsumerId); ({MsgId, {MsgNum, Msg}}, {S, E}) -> - return_one(MsgId, MsgNum, Msg, S, E, ConsumerId) + return_one(Meta, MsgId, MsgNum, Msg, S, E, ConsumerId) end, {State, Effects0}, Checked). %% checkout new messages to consumers -checkout(#{index := Index}, OldState, State0, Effects0) -> - {State1, _Result, Effects1} = checkout0(checkout_one(State0), +checkout(#{index := Index} = Meta, OldState, State0, Effects0) -> + {State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0), Effects0, {#{}, #{}}), case evaluate_limit(Index, false, OldState, State1, Effects1) of {State, true, Effects} -> @@ -1436,21 +1440,21 @@ checkout(#{index := Index}, OldState, State0, Effects0) -> {State, ok, Effects} end. -checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, +checkout0(Meta, {success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, Effects, {SendAcc, LogAcc0}) -> DelMsg = {RaftIdx, {MsgId, Header}}, LogAcc = maps:update_with(ConsumerId, fun (M) -> [DelMsg | M] end, [DelMsg], LogAcc0), - checkout0(checkout_one(State), Effects, {SendAcc, LogAcc}); -checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, + checkout0(Meta, checkout_one(Meta, State), Effects, {SendAcc, LogAcc}); +checkout0(Meta, {success, ConsumerId, MsgId, Msg, State}, Effects, {SendAcc0, LogAcc}) -> DelMsg = {MsgId, Msg}, SendAcc = maps:update_with(ConsumerId, fun (M) -> [DelMsg | M] end, [DelMsg], SendAcc0), - checkout0(checkout_one(State), Effects, {SendAcc, LogAcc}); -checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) -> + checkout0(Meta, checkout_one(Meta, State), Effects, {SendAcc, LogAcc}); +checkout0(_Meta, {Activity, State0}, Effects0, {SendAcc, LogAcc}) -> Effects1 = case Activity of nochange -> append_send_msg_effects( @@ -1610,7 +1614,7 @@ reply_log_effect(RaftIdx, MsgId, Header, Ready, From) -> {dequeue, {MsgId, {Header, Msg}}, Ready}}}] end}. -checkout_one(#?MODULE{service_queue = SQ0, +checkout_one(Meta, #?MODULE{service_queue = SQ0, messages = Messages0, consumers = Cons0} = InitState) -> case queue:peek(SQ0) of @@ -1625,11 +1629,11 @@ checkout_one(#?MODULE{service_queue = SQ0, %% no credit but was still on queue %% can happen when draining %% recurse without consumer on queue - checkout_one(InitState#?MODULE{service_queue = SQ1}); + checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); {ok, #consumer{status = cancelled}} -> - checkout_one(InitState#?MODULE{service_queue = SQ1}); + checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); {ok, #consumer{status = suspected_down}} -> - checkout_one(InitState#?MODULE{service_queue = SQ1}); + checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); {ok, #consumer{checked_out = Checked0, next_msg_id = Next, credit = Credit, @@ -1639,11 +1643,9 @@ checkout_one(#?MODULE{service_queue = SQ0, next_msg_id = Next + 1, credit = Credit - 1, delivery_count = DelCnt + 1}, - {Cons, SQ, []} = % we expect no effects - update_or_remove_sub(ConsumerId, Con, - Cons0, SQ1, []), - State1 = State0#?MODULE{service_queue = SQ, - consumers = Cons}, + State1 = update_or_remove_sub(Meta, + ConsumerId, Con, + State0#?MODULE{service_queue = SQ1}), {State, Msg} = case ConsumerMsg of {'$prefix_msg', Header} -> @@ -1665,7 +1667,7 @@ checkout_one(#?MODULE{service_queue = SQ0, {success, ConsumerId, Next, Msg, State}; error -> %% consumer did not exist but was queued, recurse - checkout_one(InitState#?MODULE{service_queue = SQ1}) + checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}) end; empty -> {nochange, InitState} @@ -1677,32 +1679,34 @@ checkout_one(#?MODULE{service_queue = SQ0, end end. -update_or_remove_sub(ConsumerId, #consumer{lifetime = auto, - credit = 0} = Con, - Cons, ServiceQueue, Effects) -> - {maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects}; -update_or_remove_sub(ConsumerId, #consumer{lifetime = auto} = Con, - Cons, ServiceQueue, Effects) -> - {maps:put(ConsumerId, Con, Cons), - uniq_queue_in(ConsumerId, ServiceQueue), Effects}; -update_or_remove_sub(ConsumerId, #consumer{lifetime = once, +update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto, + credit = 0} = Con, + #?MODULE{consumers = Cons} = State) -> + State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons)}; +update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto} = Con, + #?MODULE{consumers = Cons, + service_queue = ServiceQueue} = State) -> + State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons), + service_queue = uniq_queue_in(ConsumerId, ServiceQueue)}; +update_or_remove_sub(#{system_time := Ts}, + ConsumerId, #consumer{lifetime = once, checked_out = Checked, credit = 0} = Con, - Cons, ServiceQueue, Effects) -> - case maps:size(Checked) of + #?MODULE{consumers = Cons} = State) -> + case maps:size(Checked) of 0 -> % we're done with this consumer - % TODO: demonitor consumer pid but _only_ if there are no other - % monitors for this pid - {maps:remove(ConsumerId, Cons), ServiceQueue, Effects}; + State#?MODULE{consumers = maps:remove(ConsumerId, Cons), + last_active = Ts}; _ -> % there are unsettled items so need to keep around - {maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects} + State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons)} end; -update_or_remove_sub(ConsumerId, #consumer{lifetime = once} = Con, - Cons, ServiceQueue, Effects) -> - {maps:put(ConsumerId, Con, Cons), - uniq_queue_in(ConsumerId, ServiceQueue), Effects}. +update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = once} = Con, + #?MODULE{consumers = Cons, + service_queue = ServiceQueue} = State) -> + State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons), + service_queue = uniq_queue_in(ConsumerId, ServiceQueue)}. uniq_queue_in(Key, Queue) -> % TODO: queue:member could surely be quite expensive, however the practical @@ -1799,7 +1803,7 @@ dehydrate_state(#?MODULE{messages = Messages, PPCnt + lqueue:len(Messages), PrefMsgs}, waiting_consumers = Waiting}. -%% TODO make body recursive to avoid lists:reverse +%% TODO make body recursive to avoid allocating lists:reverse call dehydrate_messages(Msgs0, Acc0) -> {OutRes, Msgs} = lqueue:out(Msgs0), case OutRes of @@ -1866,7 +1870,7 @@ make_checkout(ConsumerId, Spec, Meta) -> spec = Spec, meta = Meta}. -spec make_settle(consumer_id(), [msg_id()]) -> protocol(). -make_settle(ConsumerId, MsgIds) -> +make_settle(ConsumerId, MsgIds) when is_list(MsgIds) -> #settle{consumer_id = ConsumerId, msg_ids = MsgIds}. -spec make_return(consumer_id(), [msg_id()]) -> protocol(). @@ -2024,3 +2028,18 @@ suspected_pids_for(Node, #?MODULE{consumers = Cons0, [P | Acc]; (_, Acc) -> Acc end, Enqs, WaitingConsumers0). + +is_expired(Ts, #?MODULE{cfg = #cfg{expires = Expires}, + last_active = LastActive, + consumers = Consumers}) + when is_number(LastActive) andalso is_number(Expires) -> + %% TODO: should it be active consumers? + Active = maps:filter(fun (_, #consumer{status = suspected_down}) -> + false; + (_, _) -> + true + end, Consumers), + + Ts > (LastActive + Expires) andalso maps:size(Active) == 0; +is_expired(_Ts, _State) -> + false. diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index f9b0d5e1c3..138b19d390 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -101,6 +101,8 @@ -type consumer_strategy() :: competing | single_active. +-type milliseconds() :: non_neg_integer(). + -record(enqueuer, {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list @@ -130,6 +132,7 @@ delivery_limit :: option(non_neg_integer()), max_in_memory_length :: option(non_neg_integer()), max_in_memory_bytes :: option(non_neg_integer()), + expires :: undefined | milliseconds(), unused_1, unused_2 }). @@ -184,6 +187,7 @@ waiting_consumers = [] :: [{consumer_id(), consumer()}], msg_bytes_in_memory = 0 :: non_neg_integer(), msgs_ready_in_memory = 0 :: non_neg_integer(), + last_active :: undefined | non_neg_integer(), unused_1, unused_2 }). @@ -199,4 +203,7 @@ max_in_memory_bytes => non_neg_integer(), overflow_strategy => drop_head | reject_publish, single_active_consumer_on => boolean(), - delivery_limit => non_neg_integer()}. + delivery_limit => non_neg_integer(), + expires => non_neg_integer(), + created => non_neg_integer() + }. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 24f7e4354e..aa7a12fbaa 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -19,7 +19,7 @@ -export([cluster_state/1, status/2]). -export([update_consumer_handler/8, update_consumer/9]). -export([cancel_consumer_handler/2, cancel_consumer/3]). --export([become_leader/2, handle_tick/3]). +-export([become_leader/2, handle_tick/3, spawn_deleter/1]). -export([rpc_delete_metrics/1]). -export([format/1]). -export([open_files/1]). @@ -165,6 +165,9 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q), MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q), DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q), + Expires = args_policy_lookup(<<"expires">>, + fun (A, _B) -> A end, + Q), #{name => Name, queue_resource => QName, dead_letter_handler => dlx_mfa(Q), @@ -175,7 +178,9 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> max_in_memory_bytes => MaxMemoryBytes, single_active_consumer_on => single_active_consumer_on(Q), delivery_limit => DeliveryLimit, - overflow_strategy => overflow(Overflow, drop_head) + overflow_strategy => overflow(Overflow, drop_head), + created => erlang:system_time(millisecond), + expires => Expires }. single_active_consumer_on(Q) -> @@ -305,6 +310,7 @@ is_policy_applicable(_Q, Policy) -> Applicable = [<<"max-length">>, <<"max-length-bytes">>, <<"overflow">>, + <<"expires">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>, <<"delivery-limit">>, @@ -319,6 +325,12 @@ rpc_delete_metrics(QName) -> ets:delete(queue_metrics, QName), ok. +spawn_deleter(QName) -> + spawn(fun () -> + {ok, Q} = rabbit_amqqueue:lookup(QName), + delete(Q, false, false, <<"expired">>) + end). + handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}, Nodes) -> @@ -1351,7 +1363,7 @@ maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). check_invalid_arguments(QueueName, Args) -> - Keys = [<<"x-expires">>, <<"x-message-ttl">>, + Keys = [<<"x-message-ttl">>, <<"x-max-priority">>, <<"x-queue-mode">>], [case rabbit_misc:table_lookup(Args, Key) of undefined -> ok; diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index ff09a70c98..cc54aae78e 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -130,7 +130,8 @@ all_tests() -> consumer_metrics, invalid_policy, delete_if_empty, - delete_if_unused + delete_if_unused, + queue_ttl ]. memory_tests() -> @@ -340,11 +341,6 @@ declare_invalid_args(Config) -> {{shutdown, {server_initiated_close, 406, _}}, _}, declare(rabbit_ct_client_helpers:open_channel(Config, Server), LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-expires">>, long, 2000}])), - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, {<<"x-message-ttl">>, long, 2000}])), ?assertExit( @@ -2462,6 +2458,29 @@ delete_if_unused(Config) -> amqp_channel:call(Ch, #'queue.delete'{queue = QQ, if_unused = true})). +queue_ttl(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-expires">>, long, 1000}])), + timer:sleep(5500), + %% check queue no longer exists + ?assertExit( + {{shutdown, + {server_initiated_close,404, + <<"NOT_FOUND - no queue 'queue_ttl' in vhost '/'">>}}, + _}, + amqp_channel:call(Ch, #'queue.declare'{queue = QQ, + passive = true, + durable = true, + auto_delete = false, + arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-expires">>, long, 1000}]})), + ok. + %%---------------------------------------------------------------------------- declare(Ch, Q) -> diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index ce2e4d435a..bbd8bd7df8 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -802,7 +802,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> State1 = lists:foldl(AddConsumer, State0, Consumers), % the channel of the active consumer goes down - {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, noproc}, State1), + {State2, _, Effects} = apply(meta(2), {down, Pid1, noproc}, State1), % fell back to another consumer ?assertEqual(1, map_size(State2#rabbit_fifo.consumers)), % there are still waiting consumers @@ -815,7 +815,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> update_consumer_handler, _}, Effects), % the channel of the active consumer and a waiting consumer goes down - {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, noproc}, State2), + {State3, _, Effects2} = apply(meta(3), {down, Pid2, noproc}, State2), % fell back to another consumer ?assertEqual(1, map_size(State3#rabbit_fifo.consumers)), % no more waiting consumer @@ -829,7 +829,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> update_consumer_handler, _}, Effects2), % the last channel goes down - {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3), + {State4, _, Effects3} = apply(meta(4), {down, Pid3, doesnotmatter}, State3), % no more consumers ?assertEqual(0, map_size(State4#rabbit_fifo.consumers)), ?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)), @@ -1168,11 +1168,11 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), - {State2, _, Effects2} = apply(#{index => 2}, {down, Pid1, noconnection}, State1), + {State2, _, Effects2} = apply(meta(2), {down, Pid1, noconnection}, State1), % one monitor and one consumer status update (deactivated) ?assertEqual(3, length(Effects2)), - {_, _, Effects3} = apply(#{index => 3}, {nodeup, node(self())}, State2), + {_, _, Effects3} = apply(meta(3), {nodeup, node(self())}, State2), % for each consumer: 1 effect to monitor the consumer PID ?assertEqual(5, length(Effects3)). @@ -1403,7 +1403,12 @@ purge_nodes_test(_) -> ok. meta(Idx) -> - #{index => Idx, term => 1, + meta(Idx, 0). + +meta(Idx, Timestamp) -> + #{index => Idx, + term => 1, + system_time => Timestamp, from => {make_ref(), self()}}. enq(Idx, MsgSeq, Msg, State) -> @@ -1509,6 +1514,110 @@ machine_version_test(_) -> ?assertEqual(1, lqueue:len(Msgs)), ok. +queue_ttl_test(_) -> + QName = rabbit_misc:r(<<"/">>, queue, <<"test">>), + Conf = #{name => ?FUNCTION_NAME, + queue_resource => QName, + created => 1000, + expires => 1000}, + S0 = rabbit_fifo:init(Conf), + Now = 1500, + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S0), + %% this should delete the queue + [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}] + = rabbit_fifo:tick(Now + 1000, S0), + %% adding a consumer should not ever trigger deletion + Cid = {<<"cid1">>, self()}, + {S1, _} = check_auto(Cid, 1, S0), + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S1), + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1), + %% cancelling the consumer should then + {S2, _, _} = apply(meta(2, Now), + rabbit_fifo:make_checkout(Cid, cancel, #{}), S1), + %% last_active should have been reset when consumer was cancelled + %% last_active = 2500 + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2), + %% but now it should be deleted + [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}] + = rabbit_fifo:tick(Now + 2500, S2), + + %% Same for downs + {S2D, _, _} = apply(meta(2, Now), + {down, self(), noconnection}, S1), + %% last_active should have been reset when consumer was cancelled + %% last_active = 2500 + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2D), + %% but now it should be deleted + [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}] + = rabbit_fifo:tick(Now + 2500, S2D), + + %% dequeue should set last applied + {S1Deq, {dequeue, empty}} = + apply(meta(2, Now), + rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), + S0), + + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1Deq), + %% but now it should be deleted + [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}] + = rabbit_fifo:tick(Now + 2500, S1Deq), + %% Enqueue message, + {E1, _, _} = apply(meta(2, Now), + rabbit_fifo:make_enqueue(self(), 1, msg1), S0), + Deq = {<<"deq1">>, self()}, + {E2, {dequeue, {MsgId, _}, _}, _} = + apply(meta(3, Now), + rabbit_fifo:make_checkout(Deq, {dequeue, unsettled}, #{}), + E1), + {E3, _, _} = apply(meta(3, Now + 1000), + rabbit_fifo:make_settle(Deq, [MsgId]), E2), + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1500, E3), + %% but now it should be deleted + [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}] + = rabbit_fifo:tick(Now + 3000, E3), + + ok. + +queue_ttl_with_single_active_consumer_test(_) -> + QName = rabbit_misc:r(<<"/">>, queue, <<"test">>), + Conf = #{name => ?FUNCTION_NAME, + queue_resource => QName, + created => 1000, + expires => 1000, + single_active_consumer_on => true}, + S0 = rabbit_fifo:init(Conf), + Now = 1500, + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S0), + %% this should delete the queue + [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}] + = rabbit_fifo:tick(Now + 1000, S0), + %% adding a consumer should not ever trigger deletion + Cid = {<<"cid1">>, self()}, + {S1, _} = check_auto(Cid, 1, S0), + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S1), + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1), + %% cancelling the consumer should then + {S2, _, _} = apply(meta(2, Now), + rabbit_fifo:make_checkout(Cid, cancel, #{}), S1), + %% last_active should have been reset when consumer was cancelled + %% last_active = 2500 + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2), + %% but now it should be deleted + [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}] + = rabbit_fifo:tick(Now + 2500, S2), + + %% Same for downs + {S2D, _, _} = apply(meta(2, Now), + {down, self(), noconnection}, S1), + %% last_active should have been reset when consumer was cancelled + %% last_active = 2500 + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2D), + %% but now it should be deleted + [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}] + = rabbit_fifo:tick(Now + 2500, S2D), + + ok. + %% Utility init(Conf) -> rabbit_fifo:init(Conf). diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index dd2c7154d0..859db2178f 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -1081,7 +1081,7 @@ do_apply(Cmd, #t{effects = Effs, %% down T; _ -> - {St, Effects} = case rabbit_fifo:apply(#{index => Index}, Cmd, S0) of + {St, Effects} = case rabbit_fifo:apply(meta(Index), Cmd, S0) of {S, _, E} when is_list(E) -> {S, E}; {S, _, E} -> @@ -1196,7 +1196,7 @@ test_init(Conf) -> rabbit_fifo:init(maps:merge(Default, Conf)). meta(Idx) -> - #{index => Idx, term => 1}. + #{index => Idx, term => 1, system_time => 0}. make_checkout(Cid, Spec) -> rabbit_fifo:make_checkout(Cid, Spec, #{}). |