diff options
| -rw-r--r-- | src/rabbit_fifo.erl | 548 | ||||
| -rw-r--r-- | src/rabbit_fifo_index.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 16 | ||||
| -rw-r--r-- | test/dynamic_qq_SUITE.erl | 5 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 69 | ||||
| -rw-r--r-- | test/quorum_queue_utils.erl | 9 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 11 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 217 | ||||
| -rw-r--r-- | test/unit_inbroker_parallel_SUITE.erl | 207 |
9 files changed, 764 insertions, 344 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index b62ff62a51..9752e06e34 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -38,6 +38,7 @@ % queries query_messages_ready/1, query_messages_checked_out/1, + query_messages_total/1, query_processes/1, query_ra_indexes/1, query_consumer_count/1, @@ -87,8 +88,13 @@ -type msg() :: {msg_header(), raw_msg()}. %% message with a header map. +-type msg_size() :: non_neg_integer(). +%% the size in bytes of the msg payload + -type indexed_msg() :: {ra_index(), msg()}. +-type prefix_msg() :: {'$prefix_msg', msg_size()}. + -type delivery_msg() :: {msg_id(), msg()}. %% A tuple consisting of the message id and the headered message. @@ -157,7 +163,7 @@ -type applied_mfa() :: {module(), atom(), list()}. % represents a partially applied module call --define(SHADOW_COPY_INTERVAL, 4096 * 4). +-define(SHADOW_COPY_INTERVAL, 4096 * 8). -define(USE_AVG_HALF_LIFE, 10000.0). -record(consumer, @@ -189,6 +195,7 @@ suspected_down = false :: boolean() }). + -record(state, {name :: atom(), queue_resource :: rabbit_types:r('queue'), @@ -202,7 +209,8 @@ next_msg_num = 1 :: msg_in_id(), % list of returned msg_in_ids - when checking out it picks from % this list first before taking low_msg_num - returns = lqueue:new() :: lqueue:lqueue('$prefix_msg' | msg_in_id()), + returns = lqueue:new() :: lqueue:lqueue(prefix_msg() | + {msg_in_id(), indexed_msg()}), % a counter of enqueues - used to trigger shadow copy points enqueue_count = 0 :: non_neg_integer(), % a map containing all the live processes that have ever enqueued @@ -224,19 +232,20 @@ dead_letter_handler :: maybe(applied_mfa()), become_leader_handler :: maybe(applied_mfa()), %% This is a special field that is only used for snapshots - %% It represents the number of queued messages at the time the + %% It represents the queued messages at the time the %% dehydrated snapshot state was cached. %% As release_cursors are only emitted for raft indexes where all %% prior messages no longer contribute to the current state we can - %% replace all message payloads at some index with a single integer - %% to be decremented during `checkout_one' until it's 0 after which - %% it instead takes messages from the `messages' map. + %% replace all message payloads with their sizes (to be used for + %% overflow calculations). %% This is done so that consumers are still served in a deterministic %% order on recovery. - prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(), - PrefixMsgs :: non_neg_integer()}, + prefix_msgs = {[], []} :: {Return :: [msg_size()], + PrefixMsgs :: [msg_size()]}, msg_bytes_enqueue = 0 :: non_neg_integer(), msg_bytes_checkout = 0 :: non_neg_integer(), + max_length :: maybe(non_neg_integer()), + max_bytes :: maybe(non_neg_integer()), %% whether single active consumer is on or not for this queue consumer_strategy = default :: default | single_active, %% waiting consumers, one is picked active consumer is cancelled or dies @@ -251,6 +260,8 @@ dead_letter_handler => applied_mfa(), become_leader_handler => applied_mfa(), shadow_copy_interval => non_neg_integer(), + max_length => non_neg_integer(), + max_bytes => non_neg_integer(), single_active_consumer_on => boolean()}. -export_type([protocol/0, @@ -272,12 +283,14 @@ init(#{name := Name, queue_resource := Resource} = Conf) -> update_config(Conf, #state{name = Name, - queue_resource = Resource}). + queue_resource = Resource}). update_config(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), BLH = maps:get(become_leader_handler, Conf, undefined), SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL), + MaxLength = maps:get(max_length, Conf, undefined), + MaxBytes = maps:get(max_bytes, Conf, undefined), ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of true -> single_active; @@ -287,6 +300,8 @@ update_config(Conf, State) -> State#state{dead_letter_handler = DLH, become_leader_handler = BLH, shadow_copy_interval = SHI, + max_length = MaxLength, + max_bytes = MaxBytes, consumer_strategy = ConsumerStrategy}. zero(_) -> @@ -294,59 +309,49 @@ zero(_) -> % msg_ids are scoped per consumer % ra_indexes holds all raft indexes for enqueues currently on queue --spec apply(ra_machine:command_meta_data(), command(), - state()) -> - {state(), Reply :: term(), ra_machine:effects()}. -apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq, - msg = RawMsg}, State00) -> - case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State00) of - {ok, State0, Effects1} -> - %% need to checkout before capturing the shadow copy else - %% snapshots may not be complete - {State, ok, Effects} = checkout( - add_bytes_enqueue(RawMsg, State0), - Effects1), - append_to_master_index(RaftIdx, Effects, State); - {duplicate, State, Effects} -> - {State, ok, lists:reverse(Effects)} - end; -apply(#{index := RaftIdx}, +-spec apply(ra_machine:command_meta_data(), command(), state()) -> + {state(), Reply :: term(), ra_machine:effects()} | + {state(), Reply :: term()}. +apply(Metadata, #enqueue{pid = From, seq = Seq, + msg = RawMsg}, State00) -> + apply_enqueue(Metadata, From, Seq, RawMsg, State00); +apply(Meta, #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, #state{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := Con0} -> % need to increment metrics before completing as any snapshot % states taken need to includ them - complete_and_checkout(RaftIdx, MsgIds, ConsumerId, + complete_and_checkout(Meta, MsgIds, ConsumerId, Con0, [], State); _ -> {State, ok} end; -apply(#{index := RaftIdx}, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, +apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, #state{consumers = Cons0} = State0) -> case Cons0 of #{ConsumerId := Con0} -> Discarded = maps:with(MsgIds, Con0#consumer.checked_out), Effects = dead_letter_effects(Discarded, State0, []), - complete_and_checkout(RaftIdx, MsgIds, ConsumerId, Con0, + complete_and_checkout(Meta, MsgIds, ConsumerId, Con0, Effects, State0); _ -> {State0, ok} end; -apply(_, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, +apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, #state{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := Con0 = #consumer{checked_out = Checked0}} -> Checked = maps:without(MsgIds, Checked0), Returned = maps:with(MsgIds, Checked0), MsgNumMsgs = maps:values(Returned), - return(ConsumerId, MsgNumMsgs, Con0, Checked, [], State); + return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, [], State); _ -> {State, ok} end; -apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, - drain = Drain, consumer_id = ConsumerId}, +apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, + drain = Drain, consumer_id = ConsumerId}, #state{consumers = Cons0, service_queue = ServiceQueue0} = State0) -> case Cons0 of @@ -359,7 +364,7 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, ServiceQueue0), Cons = maps:put(ConsumerId, Con1, Cons0), {State1, ok, Effects} = - checkout(State0#state{service_queue = ServiceQueue, + checkout(Meta, State0#state{service_queue = ServiceQueue, consumers = Cons}, []), Response = {send_credit_reply, maps:size(State1#state.messages)}, %% by this point all checkouts for the updated credit value @@ -389,46 +394,45 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, %% credit for unknown consumer - just ignore {State0, ok} end; -apply(_, #checkout{spec = {dequeue, _}}, - #state{messages = M, - prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 -> - %% FIXME: also check if there are returned messages - %% TODO do we need metric visibility of empty get requests? - {State0, {dequeue, empty}}; -apply(Meta, #checkout{spec = {dequeue, settled}, meta = ConsumerMeta, +apply(Meta, #checkout{spec = {dequeue, Settlement}, + meta = ConsumerMeta, consumer_id = ConsumerId}, - State0) -> - % TODO: this clause could probably be optimised - State1 = update_consumer(ConsumerId, ConsumerMeta, - {once, 1, simple_prefetch}, State0), - % turn send msg effect into reply - {success, _, MsgId, Msg, State2} = checkout_one(State1), - % immediately settle - {State, _, Effects} = apply(Meta, make_settle(ConsumerId, [MsgId]), State2), - {State, {dequeue, {MsgId, Msg}}, Effects}; -apply(_, #checkout{spec = {dequeue, unsettled}, - meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, - State0) -> - State1 = update_consumer(ConsumerId, ConsumerMeta, - {once, 1, simple_prefetch}, State0), - case checkout_one(State1) of - {success, _, MsgId, Msg, S} -> - {S, {dequeue, {MsgId, Msg}}, [{monitor, process, Pid}]}; - {inactive, S} -> - {S, {dequeue, empty}, [{aux, inactive}]}; - S -> - {S, {dequeue, empty}} + #state{consumers = Consumers} = State0) -> + Exists = maps:is_key(ConsumerId, Consumers), + case messages_ready(State0) of + 0 -> + {State0, {dequeue, empty}}; + _ when Exists -> + %% a dequeue using the same consumer_id isn't possible at this point + {State0, {dequeue, empty}}; + _ -> + State1 = update_consumer(ConsumerId, ConsumerMeta, + {once, 1, simple_prefetch}, State0), + {success, _, MsgId, Msg, State2} = checkout_one(State1), + case Settlement of + unsettled -> + {_, Pid} = ConsumerId, + {State2, {dequeue, {MsgId, Msg}}, + [{monitor, process, Pid}]}; + settled -> + %% immediately settle the checkout + {State, _, Effects} = apply(Meta, + make_settle(ConsumerId, [MsgId]), + State2), + {State, {dequeue, {MsgId, Msg}}, Effects} + end end; -apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> +apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> {State, Effects} = cancel_consumer(ConsumerId, State0, []), % TODO: here we should really demonitor the pid but _only_ if it has no - % other consumers or enqueuers. - checkout(State, Effects); -apply(_, #checkout{spec = Spec, meta = Meta, - consumer_id = {_, Pid} = ConsumerId}, + % other consumers or enqueuers. leaving a monitor in place isn't harmful + % however + checkout(Meta, State, Effects); +apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, + consumer_id = {_, Pid} = ConsumerId}, State0) -> - State1 = update_consumer(ConsumerId, Meta, Spec, State0), - checkout(State1, [{monitor, process, Pid}]); + State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0), + checkout(Meta, State1, [{monitor, process, Pid}]); apply(#{index := RaftIdx}, #purge{}, #state{ra_indexes = Indexes0, messages = Messages} = State0) -> @@ -486,7 +490,7 @@ apply(_, {down, ConsumerPid, noconnection}, %% TODO: should we run a checkout here? {State#state{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers}, ok, Effects}; -apply(_, {down, Pid, _Info}, #state{consumers = Cons0, +apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages % This should be ok as we won't see any more enqueues from this pid @@ -506,8 +510,8 @@ apply(_, {down, Pid, _Info}, #state{consumers = Cons0, {State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) -> cancel_consumer(ConsumerId, S, E) end, {State2, Effects1}, DownConsumers), - checkout(State, Effects); -apply(_, {nodeup, Node}, #state{consumers = Cons0, + checkout(Meta, State, Effects); +apply(Meta, {nodeup, Node}, #state{consumers = Cons0, enqueuers = Enqs0, service_queue = SQ0} = State0) -> %% A node we are monitoring has come back. @@ -535,13 +539,13 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0, Acc end, {Cons0, SQ0, Monitors}, Cons0), % TODO: avoid list concat - checkout(State0#state{consumers = Cons1, enqueuers = Enqs1, - service_queue = SQ, - waiting_consumers = WaitingConsumers}, Effects); + checkout(Meta, State0#state{consumers = Cons1, enqueuers = Enqs1, + service_queue = SQ, + waiting_consumers = WaitingConsumers}, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; -apply(_, #update_config{config = Conf}, State) -> - {update_config(Conf, State), ok}. +apply(Meta, #update_config{config = Conf}, State) -> + checkout(Meta, update_config(Conf, State), []). handle_waiting_consumer_down(_Pid, #state{consumer_strategy = default} = State) -> @@ -592,7 +596,7 @@ state_enter(leader, #state{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers, name = Name, - prefix_msg_counts = {0, 0}, + prefix_msgs = {[], []}, become_leader_handler = BLH}) -> % return effects to monitor all current consumers and enqueuers Pids = lists:usort(maps:keys(Enqs) @@ -608,10 +612,10 @@ state_enter(leader, #state{consumers = Cons, {Mod, Fun, Args} -> [{mod_call, Mod, Fun, Args ++ [Name]} | Effects] end; -state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts}) - when PrefixMsgCounts =/= {0, 0} -> +state_enter(recovered, #state{prefix_msgs = PrefixMsgCounts}) + when PrefixMsgCounts =/= {[], []} -> %% TODO: remove assertion? - exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts}); + exit({rabbit_fifo, unexpected_prefix_msgs, PrefixMsgCounts}); state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0, waiting_consumers = WaitingConsumers0}) -> @@ -629,14 +633,12 @@ state_enter(_, _) -> -spec tick(non_neg_integer(), state()) -> ra_machine:effects(). tick(_Ts, #state{name = Name, queue_resource = QName, - messages = Messages, - ra_indexes = Indexes, msg_bytes_enqueue = EnqueueBytes, msg_bytes_checkout = CheckoutBytes} = State) -> Metrics = {Name, - maps:size(Messages), % Ready + messages_ready(State), num_checked_out(State), % checked out - rabbit_fifo_index:size(Indexes), %% Total + messages_total(State), query_consumer_count(State), % Consumers EnqueueBytes, CheckoutBytes}, @@ -649,8 +651,7 @@ overview(#state{consumers = Cons, messages = Messages, ra_indexes = Indexes, msg_bytes_enqueue = EnqueueBytes, - msg_bytes_checkout = CheckoutBytes - } = State) -> + msg_bytes_checkout = CheckoutBytes} = State) -> #{type => ?MODULE, num_consumers => maps:size(Cons), num_checked_out => num_checked_out(State), @@ -693,13 +694,17 @@ handle_aux(_, cast, Cmd, {Name, Use0}, Log, _) -> %%% Queries -query_messages_ready(#state{messages = M}) -> - M. +%% TODO: this doesn't take returns into account +query_messages_ready(State) -> + messages_ready(State). query_messages_checked_out(#state{consumers = Consumers}) -> maps:fold(fun (_, #consumer{checked_out = C}, S) -> - maps:merge(S, maps:from_list(maps:values(C))) - end, #{}, Consumers). + maps:size(C) + S + end, 0, Consumers). + +query_messages_total(State) -> + messages_total(State). query_processes(#state{enqueuers = Enqs, consumers = Cons0}) -> Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0), @@ -770,6 +775,18 @@ usage(Name) when is_atom(Name) -> %%% Internal +messages_ready(#state{messages = M, + prefix_msgs = {PreR, PreM}, + returns = R}) -> + + %% TODO: optimise to avoid length/1 call + maps:size(M) + lqueue:len(R) + length(PreR) + length(PreM). + +messages_total(#state{ra_indexes = I, + prefix_msgs = {PreR, PreM}}) -> + + rabbit_fifo_index:size(I) + length(PreR) + length(PreM). + update_use({inactive, _, _, _} = CUInfo, inactive) -> CUInfo; update_use({active, _, _} = CUInfo, active) -> @@ -879,6 +896,35 @@ cancel_consumer0(ConsumerId, {S0, Effects0} end. +apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> + Bytes = message_size(RawMsg), + case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of + {ok, State1, Effects1} -> + State2 = append_to_master_index(RaftIdx, + add_bytes_enqueue(Bytes, State1)), + {State, ok, Effects} = checkout(Meta, State2, Effects1), + {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects}; + {duplicate, State, Effects} -> + {State, ok, Effects} + end. + +drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) -> + case take_next_msg(State0) of + {FullMsg = {_MsgId, {RaftIdxToDrop, {_Header, Msg}}}, + State1} -> + Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0), + Bytes = message_size(Msg), + State = add_bytes_drop(Bytes, State1#state{ra_indexes = Indexes}), + Effects = dead_letter_effects(maps:put(none, FullMsg, #{}), + State, Effects0), + {State, Effects}; + {{'$prefix_msg', Bytes}, State1} -> + State = add_bytes_drop(Bytes, State1), + {State, Effects0}; + empty -> + {State0, Effects0} + end. + enqueue(RaftIdx, RawMsg, #state{messages = Messages, low_msg_num = LowMsgNum, next_msg_num = NextMsgNum} = State0) -> @@ -889,19 +935,28 @@ enqueue(RaftIdx, RawMsg, #state{messages = Messages, low_msg_num = min(LowMsgNum, NextMsgNum), next_msg_num = NextMsgNum + 1}. -append_to_master_index(RaftIdx, Effects, +append_to_master_index(RaftIdx, #state{ra_indexes = Indexes0} = State0) -> - {State, Shadow} = incr_enqueue_count(State0), - Indexes = rabbit_fifo_index:append(RaftIdx, Shadow, Indexes0), - {State#state{ra_indexes = Indexes}, ok, Effects}. + State = incr_enqueue_count(State0), + Indexes = rabbit_fifo_index:append(RaftIdx, undefined, Indexes0), + State#state{ra_indexes = Indexes}. incr_enqueue_count(#state{enqueue_count = C, shadow_copy_interval = C} = State0) -> - % time to stash a dehydrated state version - State = State0#state{enqueue_count = 0}, - {State, dehydrate_state(State)}; + % this will trigger a dehydrated version of the state to be stored + % at this raft index for potential future snapshot generation + State0#state{enqueue_count = 0}; incr_enqueue_count(#state{enqueue_count = C} = State) -> - {State#state{enqueue_count = C + 1}, undefined}. + State#state{enqueue_count = C + 1}. + +maybe_store_dehydrated_state(RaftIdx, #state{enqueue_count = 0, + ra_indexes = Indexes} = State) -> + Dehydrated = dehydrate_state(State), + State#state{ra_indexes = + rabbit_fifo_index:update_if_present(RaftIdx, Dehydrated, + Indexes)}; +maybe_store_dehydrated_state(_RaftIdx, State) -> + State. enqueue_pending(From, @@ -916,7 +971,8 @@ enqueue_pending(From, Enq, #state{enqueuers = Enqueuers0} = State) -> maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) -> % direct enqueue without tracking - {ok, enqueue(RaftIdx, RawMsg, State0), Effects}; + State = enqueue(RaftIdx, RawMsg, State0), + {ok, State, Effects}; maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, #state{enqueuers = Enqueuers0} = State0) -> case maps:get(From, Enqueuers0, undefined) of @@ -947,19 +1003,19 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, snd(T) -> element(2, T). -return(ConsumerId, MsgNumMsgs, Con0, Checked, +return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) -> Con = Con0#consumer{checked_out = Checked, credit = increase_credit(Con0, length(MsgNumMsgs))}, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), - State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) -> + State1 = lists:foldl(fun({'$prefix_msg', _} = Msg, S0) -> return_one(0, Msg, S0); ({MsgNum, Msg}, S0) -> return_one(MsgNum, Msg, S0) end, State0, MsgNumMsgs), - checkout(State1#state{consumers = Cons, - service_queue = SQ}, + checkout(Meta, State1#state{consumers = Cons, + service_queue = SQ}, Effects). % used to processes messages that are finished @@ -990,7 +1046,7 @@ increase_credit(#consumer{lifetime = auto, increase_credit(#consumer{credit = Current}, Credit) -> Current + Credit. -complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId, +complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, #consumer{checked_out = Checked0} = Con0, Effects0, #state{ra_indexes = Indexes0} = State0) -> Checked = maps:without(MsgIds, Checked0), @@ -998,15 +1054,15 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId, MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) -> add_bytes_settle(RawMsg, Acc); - (_, Acc) -> - Acc + ({'$prefix_msg', _} = M, Acc) -> + add_bytes_settle(M, Acc) end, State0, maps:values(Discarded)), %% need to pass the length of discarded as $prefix_msgs would be filtered %% by the above list comprehension {State2, Effects1} = complete(ConsumerId, MsgRaftIdxs, maps:size(Discarded), Con0, Checked, Effects0, State1), - {State, ok, Effects} = checkout(State2, Effects1), + {State, ok, Effects} = checkout(Meta, State2, Effects1), % settle metrics are incremented separately update_smallest_raft_index(IncomingRaftIdx, Indexes0, State, Effects). @@ -1016,7 +1072,7 @@ dead_letter_effects(_Discarded, Effects; dead_letter_effects(Discarded, #state{dead_letter_handler = {Mod, Fun, Args}}, Effects) -> - DeadLetters = maps:fold(fun(_, {_, {_, {_, Msg}}}, + DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, % MsgId, MsgIdID, RaftId, Header Acc) -> [{rejected, Msg} | Acc] end, [], Discarded), @@ -1028,7 +1084,6 @@ cancel_consumer_effects(ConsumerId, #state{queue_resource = QName}, Effects) -> update_smallest_raft_index(IncomingRaftIdx, OldIndexes, #state{ra_indexes = Indexes, - % prefix_msg_count = 0, messages = Messages} = State, Effects) -> case rabbit_fifo_index:size(Indexes) of 0 when map_size(Messages) =:= 0 -> @@ -1057,35 +1112,45 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes, end. % TODO update message then update messages and returns in single operations -return_one(0, '$prefix_msg', +return_one(0, {'$prefix_msg', _} = Msg, #state{returns = Returns} = State0) -> - State0#state{returns = lqueue:in('$prefix_msg', Returns)}; + add_bytes_return(Msg, + State0#state{returns = lqueue:in(Msg, Returns)}); return_one(MsgNum, {RaftId, {Header0, RawMsg}}, - #state{messages = Messages, - returns = Returns} = State0) -> + #state{returns = Returns} = State0) -> Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), Msg = {RaftId, {Header, RawMsg}}, % this should not affect the release cursor in any way add_bytes_return(RawMsg, - State0#state{messages = maps:put(MsgNum, Msg, Messages), - returns = lqueue:in(MsgNum, Returns)}). + State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}). -return_all(State, Checked0) -> +return_all(State0, Checked0) -> %% need to sort the list so that we return messages in the order %% they were checked out Checked = lists:sort(maps:to_list(Checked0)), - lists:foldl(fun ({_, '$prefix_msg'}, S) -> - return_one(0, '$prefix_msg', S); + lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, S) -> + return_one(0, Msg, S); ({_, {MsgNum, Msg}}, S) -> return_one(MsgNum, Msg, S) - end, State, Checked). + end, State0, Checked). + + %% checkout new messages to consumers %% reverses the effects list -checkout(State, Effects) -> - checkout0(checkout_one(State), Effects, #{}). +checkout(#{index := Index}, State0, Effects0) -> + {State1, _Result, Effects1} = checkout0(checkout_one(State0), + Effects0, #{}), + case evaluate_limit(State0#state.ra_indexes, false, + State1, Effects1) of + {State, true, Effects} -> + update_smallest_raft_index(Index, State0#state.ra_indexes, + State, Effects); + {State, false, Effects} -> + {State, ok, Effects} + end. checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) -> DelMsg = {MsgId, Msg}, @@ -1093,12 +1158,31 @@ checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) -> fun (M) -> [DelMsg | M] end, [DelMsg], Acc0), checkout0(checkout_one(State), Effects, Acc); -checkout0({inactive, State}, Effects0, Acc) -> - Effects = append_send_msg_effects(Effects0, Acc), - {State, ok, lists:reverse([{aux, inactive} | Effects])}; -checkout0(State, Effects0, Acc) -> - Effects = append_send_msg_effects(Effects0, Acc), - {State, ok, lists:reverse(Effects)}. +checkout0({Activity, State0}, Effects0, Acc) -> + Effects1 = case Activity of + nochange -> + append_send_msg_effects(Effects0, Acc); + inactive -> + [{aux, inactive} + | append_send_msg_effects(Effects0, Acc)] + end, + {State0, ok, lists:reverse(Effects1)}. + +evaluate_limit(_OldIndexes, Result, + #state{max_length = undefined, + max_bytes = undefined} = State, + Effects) -> + {State, Result, Effects}; +evaluate_limit(OldIndexes, Result, + State0, Effects0) -> + case is_over_limit(State0) of + true -> + {State, Effects} = drop_head(State0, Effects0), + evaluate_limit(OldIndexes, true, State, Effects); + false -> + {State0, Result, Effects0} + %% TODO: optimisation: avoid calling if nothing has been dropped? + end. append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 -> Effects; @@ -1108,57 +1192,51 @@ append_send_msg_effects(Effects0, AccMap) -> end, Effects0, AccMap), [{aux, active} | Effects]. -next_checkout_message(#state{prefix_msg_counts = {PReturns, P}} = State) - when PReturns > 0 -> +%% next message is determined as follows: +%% First we check if there are are prefex returns +%% Then we check if there are current returns +%% then we check prefix msgs +%% then we check current messages +%% +%% When we return it is always done to the current return queue +%% for both prefix messages and current messages +take_next_msg(#state{prefix_msgs = {[Bytes | Rem], P}} = State) -> %% there are prefix returns, these should be served first - {'$prefix_msg', State#state{prefix_msg_counts = {PReturns - 1, P}}}; -next_checkout_message(#state{returns = Returns, - low_msg_num = Low0, - prefix_msg_counts = {R, P}, - next_msg_num = NextMsgNum} = State) -> + {{'$prefix_msg', Bytes}, + State#state{prefix_msgs = {Rem, P}}}; +take_next_msg(#state{returns = Returns, + low_msg_num = Low0, + messages = Messages0, + prefix_msgs = {R, P}} = State) -> %% use peek rather than out there as the most likely case is an empty %% queue case lqueue:peek(Returns) of - {value, Next} -> - {Next, State#state{returns = lqueue:drop(Returns)}}; - empty when P == 0 -> + {value, NextMsg} -> + {NextMsg, + State#state{returns = lqueue:drop(Returns)}}; + empty when P == [] -> case Low0 of undefined -> - {undefined, State}; + empty; _ -> - case Low0 + 1 of - NextMsgNum -> - %% the map will be empty after this item is removed - {Low0, State#state{low_msg_num = undefined}}; - Low -> - {Low0, State#state{low_msg_num = Low}} + %% TODO: defensive? + {Msg, Messages} = maps:take(Low0, Messages0), + case maps:size(Messages) of + 0 -> + {{Low0, Msg}, + State#state{messages = Messages, + low_msg_num = undefined}}; + _ -> + {{Low0, Msg}, + State#state{messages = Messages, + low_msg_num = Low0 + 1}} end end; empty -> + [Bytes | Rem] = P, %% There are prefix msgs - {'$prefix_msg', State#state{prefix_msg_counts = {R, P - 1}}} - end. - -%% next message is determined as follows: -%% First we check if there are are prefex returns -%% Then we check if there are current returns -%% then we check prefix msgs -%% then we check current messages -%% -%% When we return it is always done to the current return queue -%% for both prefix messages and current messages -take_next_msg(#state{messages = Messages0} = State0) -> - case next_checkout_message(State0) of - {'$prefix_msg', State} -> - {'$prefix_msg', State, Messages0}; - {NextMsgInId, State} -> - %% messages are available - case maps:take(NextMsgInId, Messages0) of - {IdxMsg, Messages} -> - {{NextMsgInId, IdxMsg}, State, Messages}; - error -> - error - end + {{'$prefix_msg', Bytes}, + State#state{prefix_msgs = {R, Rem}}} end. send_msg_effect({CTag, CPid}, Msgs) -> @@ -1170,7 +1248,7 @@ checkout_one(#state{service_queue = SQ0, case queue:peek(SQ0) of {value, ConsumerId} -> case take_next_msg(InitState) of - {ConsumerMsg, State0, Messages} -> + {ConsumerMsg, State0} -> SQ1 = queue:drop(SQ0), %% there are consumers waiting to be serviced %% process consumer checkout @@ -1195,31 +1273,31 @@ checkout_one(#state{service_queue = SQ0, update_or_remove_sub(ConsumerId, Con, Cons0, SQ1, []), State1 = State0#state{service_queue = SQ, - messages = Messages, consumers = Cons}, {State, Msg} = case ConsumerMsg of - '$prefix_msg' -> - {State1, '$prefix_msg'}; + {'$prefix_msg', _} -> + {add_bytes_checkout(ConsumerMsg, State1), + ConsumerMsg}; {_, {_, {_, RawMsg} = M}} -> - {add_bytes_checkout(RawMsg, State1), M} + {add_bytes_checkout(RawMsg, State1), + M} end, {success, ConsumerId, Next, Msg, State}; error -> %% consumer did not exist but was queued, recurse checkout_one(InitState#state{service_queue = SQ1}) end; - error -> - InitState + empty -> + {nochange, InitState} end; empty -> case maps:size(Messages0) of - 0 -> InitState; + 0 -> {nochange, InitState}; _ -> {inactive, InitState} end end. - update_or_remove_sub(ConsumerId, #consumer{lifetime = auto, credit = 0} = Con, Cons, ServiceQueue, Effects) -> @@ -1262,14 +1340,14 @@ update_consumer(ConsumerId, Meta, Spec, %% general case, single active consumer off update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, Spec, - #state{consumers = Cons0, + #state{consumers = Cons0, consumer_strategy = single_active} = State0) when map_size(Cons0) == 0 -> %% single active consumer on, no one is consuming yet update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, - #state{consumer_strategy = single_active, - waiting_consumers = WaitingConsumers0} = State0) -> + #state{consumer_strategy = single_active, + waiting_consumers = WaitingConsumers0} = State0) -> %% single active consumer on and one active consumer already %% adding the new consumer to the waiting list Consumer = #consumer{lifetime = Life, meta = Meta, @@ -1289,11 +1367,10 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, %% the credit update N = maps:size(S#consumer.checked_out), C = max(0, Credit - N), - S#consumer{lifetime = Life, - credit = C} + S#consumer{lifetime = Life, credit = C} end, Init, Cons0), ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons), - ServiceQueue0), + ServiceQueue0), State0#state{consumers = Cons, service_queue = ServiceQueue}. @@ -1313,13 +1390,20 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, dehydrate_state(#state{messages = Messages, consumers = Consumers, returns = Returns, - prefix_msg_counts = {PrefRetCnt, MsgCount}} = State) -> - %% TODO: optimise to avoid having to iterate the queue to get the number - %% of current returned messages - RetLen = lqueue:len(Returns), % O(1) - CurReturns = length([R || R <- lqueue:to_list(Returns), - R =/= '$prefix_msg']), - PrefixMsgCnt = MsgCount + maps:size(Messages) - CurReturns, + prefix_msgs = {PrefRet0, PrefMsg0}} = State) -> + %% TODO: optimise this function as far as possible + PrefRet = lists:foldl(fun ({'$prefix_msg', Bytes}, Acc) -> + [Bytes | Acc]; + ({_, {_, {_, Raw}}}, Acc) -> + [message_size(Raw) | Acc] + end, + lists:reverse(PrefRet0), + lqueue:to_list(Returns)), + PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_H, Raw}}}, Acc) -> + [message_size(Raw) | Acc] + end, + lists:reverse(PrefMsg0), + lists:sort(maps:to_list(Messages))), State#state{messages = #{}, ra_indexes = rabbit_fifo_index:empty(), low_msg_num = undefined, @@ -1327,14 +1411,26 @@ dehydrate_state(#state{messages = Messages, dehydrate_consumer(C) end, Consumers), returns = lqueue:new(), - %% messages include returns - prefix_msg_counts = {RetLen + PrefRetCnt, - PrefixMsgCnt}}. + prefix_msgs = {lists:reverse(PrefRet), + lists:reverse(PrefMsgs)}}. dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> - Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0), + Checked = maps:map(fun (_, {'$prefix_msg', _} = M) -> + M; + (_, {_, {_, {_, Raw}}}) -> + {'$prefix_msg', message_size(Raw)} + end, Checked0), Con#consumer{checked_out = Checked}. +is_over_limit(#state{max_length = undefined, + max_bytes = undefined}) -> + false; +is_over_limit(#state{max_length = MaxLength, + max_bytes = MaxBytes, + msg_bytes_enqueue = BytesEnq} = State) -> + + messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes). + -spec make_enqueue(maybe(pid()), maybe(msg_seqno()), raw_msg()) -> protocol(). make_enqueue(Pid, Seq, Msg) -> #enqueue{pid = Pid, seq = Seq, msg = Msg}. @@ -1371,10 +1467,12 @@ make_purge() -> #purge{}. make_update_config(Config) -> #update_config{config = Config}. -add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) -> - Bytes = message_size(Msg), +add_bytes_enqueue(Bytes, #state{msg_bytes_enqueue = Enqueue} = State) -> State#state{msg_bytes_enqueue = Enqueue + Bytes}. +add_bytes_drop(Bytes, #state{msg_bytes_enqueue = Enqueue} = State) -> + State#state{msg_bytes_enqueue = Enqueue - Bytes}. + add_bytes_checkout(Msg, #state{msg_bytes_checkout = Checkout, msg_bytes_enqueue = Enqueue } = State) -> Bytes = message_size(Msg), @@ -1394,6 +1492,8 @@ add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout, message_size(#basic_message{content = Content}) -> #content{payload_fragments_rev = PFR} = Content, iolist_size(PFR); +message_size({'$prefix_msg', B}) -> + B; message_size(B) when is_binary(B) -> byte_size(B); message_size(Msg) -> @@ -1554,7 +1654,7 @@ enq_enq_deq_deq_settle_test() -> {State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} = apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}), State2), - {_State4, {dequeue, empty}, _} = + {_State4, {dequeue, empty}} = apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), State3), ok. @@ -1599,7 +1699,7 @@ release_cursor_test() -> checkout_enq_settle_test() -> Cid = {?FUNCTION_NAME, self()}, - {State1, [{monitor, _, _}]} = check(Cid, 1, test_init(test)), + {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)), {State2, Effects0} = enq(2, 1, first, State1), ?ASSERT_EFF({send_msg, _, {delivery, ?FUNCTION_NAME, @@ -1614,7 +1714,7 @@ checkout_enq_settle_test() -> out_of_order_enqueue_test() -> Cid = {?FUNCTION_NAME, self()}, - {State1, [{monitor, _, _}]} = check_n(Cid, 5, 5, test_init(test)), + {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), {State2, Effects2} = enq(2, 1, first, State1), ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), % assert monitor was set up @@ -1644,7 +1744,7 @@ out_of_order_first_enqueue_test() -> duplicate_enqueue_test() -> Cid = {<<"duplicate_enqueue_test">>, self()}, - {State1, [{monitor, _, _}]} = check_n(Cid, 5, 5, test_init(test)), + {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), {State2, Effects2} = enq(2, 1, first, State1), ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), {_State3, Effects3} = enq(3, 1, first, State2), @@ -1663,10 +1763,10 @@ return_checked_out_test() -> {State0, [_, _]} = enq(1, 1, first, test_init(test)), {State1, [_Monitor, {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event}, - {aux, active} + {aux, active} | _ ]} = check(Cid, 2, State0), % return - {_State2, _, [_, _]} = apply(meta(3), make_return(Cid, [MsgId]), State1), + {_State2, _, [_]} = apply(meta(3), make_return(Cid, [MsgId]), State1), ok. return_auto_checked_out_test() -> @@ -1695,7 +1795,8 @@ cancelled_checkout_out_test() -> {State1, _} = check_auto(Cid, 2, State0), % cancelled checkout should return all pending messages to queue {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), State1), - ?assertEqual(2, maps:size(State2#state.messages)), + ?assertEqual(1, maps:size(State2#state.messages)), + ?assertEqual(1, lqueue:len(State2#state.returns)), {State3, {dequeue, {0, {_, first}}}, _} = apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2), @@ -1748,14 +1849,14 @@ down_with_noconnection_returns_unack_test() -> ?assertEqual(0, maps:size(State1#state.messages)), ?assertEqual(0, lqueue:len(State1#state.returns)), {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), - ?assertEqual(1, maps:size(State2a#state.messages)), + ?assertEqual(0, maps:size(State2a#state.messages)), ?assertEqual(1, lqueue:len(State2a#state.returns)), ok. down_with_noproc_enqueuer_is_cleaned_up_test() -> State00 = test_init(test), Pid = spawn(fun() -> ok end), - {State0, _, Effects0} = apply(meta(1), {enqueue, Pid, 1, first}, State00), + {State0, _, Effects0} = apply(meta(1), make_enqueue(Pid, 1, first), State00), ?ASSERT_EFF({monitor, process, _}, Effects0), {State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0), % ensure there are no enqueuers @@ -2116,7 +2217,7 @@ single_active_consumer_test() -> % adding some consumers AddConsumer = fun(CTag, State) -> {NewState, _, _} = apply( - #{}, + meta(1), #checkout{spec = {once, 1, simple_prefetch}, meta = #{}, consumer_id = {CTag, self()}}, @@ -2134,7 +2235,8 @@ single_active_consumer_test() -> ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#state.waiting_consumers)), % cancelling a waiting consumer - {State2, _, Effects1} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1), + {State2, _, Effects1} = apply(meta(2), + #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1), % the active consumer should still be in place ?assertEqual(1, map_size(State2#state.consumers)), ?assert(maps:is_key({<<"ctag1">>, self()}, State2#state.consumers)), @@ -2146,7 +2248,7 @@ single_active_consumer_test() -> ?assertEqual(1, length(Effects1)), % cancelling the active consumer - {State3, _, Effects2} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2), + {State3, _, Effects2} = apply(meta(3), #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2), % the second registered consumer is now the active one ?assertEqual(1, map_size(State3#state.consumers)), ?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)), @@ -2157,7 +2259,7 @@ single_active_consumer_test() -> ?assertEqual(2, length(Effects2)), % cancelling the active consumer - {State4, _, Effects3} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3), + {State4, _, Effects3} = apply(meta(4), #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3), % the last waiting consumer became the active one ?assertEqual(1, map_size(State4#state.consumers)), ?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)), @@ -2167,7 +2269,7 @@ single_active_consumer_test() -> ?assertEqual(2, length(Effects3)), % cancelling the last consumer - {State5, _, Effects4} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4), + {State5, _, Effects4} = apply(meta(5), #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4), % no active consumer anymore ?assertEqual(0, map_size(State5#state.consumers)), % still nothing in the waiting list @@ -2192,7 +2294,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() -> % adding some consumers AddConsumer = fun({CTag, ChannelId}, State) -> {NewState, _, _} = apply( - #{}, + #{index => 1}, #checkout{spec = {once, 1, simple_prefetch}, meta = #{}, consumer_id = {CTag, ChannelId}}, @@ -2203,16 +2305,17 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() -> [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), % the channel of the active consumer goes down - {State2, _, Effects} = apply(#{}, {down, Pid1, doesnotmatter}, State1), + {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, doesnotmatter}, State1), % fell back to another consumer ?assertEqual(1, map_size(State2#state.consumers)), % there are still waiting consumers ?assertEqual(2, length(State2#state.waiting_consumers)), - % effects to unregister the consumer and to update the new active one (metrics) are there + % effects to unregister the consumer and + % to update the new active one (metrics) are there ?assertEqual(2, length(Effects)), % the channel of the active consumer and a waiting consumer goes down - {State3, _, Effects2} = apply(#{}, {down, Pid2, doesnotmatter}, State2), + {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, doesnotmatter}, State2), % fell back to another consumer ?assertEqual(1, map_size(State3#state.consumers)), % no more waiting consumer @@ -2221,7 +2324,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() -> ?assertEqual(3, length(Effects2)), % the last channel goes down - {State4, _, Effects3} = apply(#{}, {down, Pid3, doesnotmatter}, State3), + {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3), % no more consumers ?assertEqual(0, map_size(State4#state.consumers)), ?assertEqual(0, length(State4#state.waiting_consumers)), @@ -2237,10 +2340,11 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti shadow_copy_interval => 0, single_active_consumer_on => true}), + Meta = #{index => 1}, % adding some consumers AddConsumer = fun(CTag, State) -> {NewState, _, _} = apply( - #{}, + Meta, #checkout{spec = {once, 1, simple_prefetch}, meta = #{}, consumer_id = {CTag, self()}}, @@ -2259,7 +2363,7 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti end, State2#state.waiting_consumers), % simulate node goes back up - {State3, _, _} = apply(#{}, {nodeup, node(self())}, State2), + {State3, _, _} = apply(#{index => 2}, {nodeup, node(self())}, State2), % all the waiting consumers should be un-suspected ?assertEqual(3, length(State3#state.waiting_consumers)), @@ -2281,10 +2385,11 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test() -> Pid2 = spawn(DummyFunction), Pid3 = spawn(DummyFunction), + Meta = #{index => 1}, % adding some consumers AddConsumer = fun({CTag, ChannelId}, State) -> {NewState, _, _} = apply( - #{}, + Meta, #checkout{spec = {once, 1, simple_prefetch}, meta = #{}, consumer_id = {CTag, ChannelId}}, @@ -2310,10 +2415,11 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test() -> Pid2 = spawn(DummyFunction), Pid3 = spawn(DummyFunction), + Meta = #{index => 1}, % adding some consumers AddConsumer = fun({CTag, ChannelId}, State) -> {NewState, _, _} = apply( - #{}, + Meta, #checkout{spec = {once, 1, simple_prefetch}, meta = #{}, consumer_id = {CTag, ChannelId}}, @@ -2333,11 +2439,11 @@ query_consumers_test() -> atom_to_binary(?FUNCTION_NAME, utf8)), shadow_copy_interval => 0, single_active_consumer_on => true}), - + Meta = #{index => 1}, % adding some consumers AddConsumer = fun(CTag, State) -> {NewState, _, _} = apply( - #{}, + Meta, #checkout{spec = {once, 1, simple_prefetch}, meta = #{}, consumer_id = {CTag, self()}}, diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl index f8f414f453..184002611e 100644 --- a/src/rabbit_fifo_index.erl +++ b/src/rabbit_fifo_index.erl @@ -4,12 +4,14 @@ empty/0, fetch/2, append/3, + update_if_present/3, return/3, delete/2, size/1, smallest/1, next_key_after/2, - map/2 + map/2, + to_map/1 ]). -include_lib("ra/include/ra.hrl"). @@ -36,12 +38,22 @@ fetch(Key, #?MODULE{data = Data}) -> -spec append(integer(), term(), state()) -> state(). append(Key, Value, #?MODULE{data = Data, - smallest = Smallest, - largest = Largest} = State) + smallest = Smallest, + largest = Largest} = State) when Key > Largest orelse Largest =:= undefined -> State#?MODULE{data = maps:put(Key, Value, Data), - smallest = ra_lib:default(Smallest, Key), - largest = Key}. + smallest = ra_lib:default(Smallest, Key), + largest = Key}. + +-spec update_if_present(integer(), term(), state()) -> state(). +update_if_present(Key, Value, #?MODULE{data = Data} = State) -> + case Data of + #{Key := _} -> + State#?MODULE{data = maps:put(Key, Value, Data)}; + _ -> + State + end. + -spec return(integer(), term(), state()) -> state(). return(Key, Value, #?MODULE{data = Data, smallest = Smallest} = State) @@ -76,6 +88,10 @@ delete(Key, #?MODULE{data = Data} = State) -> size(#?MODULE{data = Data}) -> maps:size(Data). +-spec to_map(state()) -> #{integer() => term()}. +to_map(#?MODULE{data = Data}) -> + Data. + -spec smallest(state()) -> undefined | {integer(), term()}. smallest(#?MODULE{smallest = undefined}) -> undefined; diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 5c0d1c0070..1f3aa1f38b 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -152,10 +152,15 @@ ra_machine(Q) -> ra_machine_config(Q = #amqqueue{name = QName, pid = {Name, _}}) -> + %% take the minimum value of the policy and the queue arg if present + MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q), + MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), #{name => Name, queue_resource => QName, dead_letter_handler => dlx_mfa(Q), become_leader_handler => {?MODULE, become_leader, [QName]}, + max_length => MaxLength, + max_bytes => MaxBytes, single_active_consumer_on => single_active_consumer_on(Q)}. single_active_consumer_on(#amqqueue{arguments = QArguments}) -> @@ -636,8 +641,10 @@ delete_member(#amqqueue{pid = {RaName, _}, name = QName}, Node) -> %%---------------------------------------------------------------------------- dlx_mfa(Q) -> - DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, fun res_arg/2, Q), Q), - DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>, fun res_arg/2, Q), + DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, + fun res_arg/2, Q), Q), + DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>, + fun res_arg/2, Q), {?MODULE, dead_letter_publish, [DLX, DLXRKey, Q#amqqueue.name]}. init_dlx(undefined, _Q) -> @@ -829,9 +836,8 @@ qnode({_, Node}) -> Node. check_invalid_arguments(QueueName, Args) -> - Keys = [<<"x-expires">>, <<"x-message-ttl">>, <<"x-max-length">>, - <<"x-max-length-bytes">>, <<"x-max-priority">>, <<"x-overflow">>, - <<"x-queue-mode">>], + Keys = [<<"x-expires">>, <<"x-message-ttl">>, + <<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>], [case rabbit_misc:table_lookup(Args, Key) of undefined -> ok; _TypeVal -> rabbit_misc:protocol_error( diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl index d1158ef07a..3357e6f74b 100644 --- a/test/dynamic_qq_SUITE.erl +++ b/test/dynamic_qq_SUITE.erl @@ -135,9 +135,8 @@ force_delete_if_no_consensus(Config) -> passive = true})), %% TODO implement a force delete BCh2 = rabbit_ct_client_helpers:open_channel(Config, B), - ?assertExit({{shutdown, - {connection_closing, {server_initiated_close, 541, _}}}, _}, - amqp_channel:call(BCh2, #'queue.delete'{queue = QName})), + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(BCh2, #'queue.delete'{queue = QName})), ok. takeover_on_failure(Config) -> diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index dcba910a6a..48dac3ca57 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -22,6 +22,7 @@ -import(quorum_queue_utils, [wait_for_messages_ready/3, wait_for_messages_pending_ack/3, + wait_for_messages_total/3, dirty_query/3, ra_name/1]). @@ -37,6 +38,7 @@ all() -> groups() -> [ {single_node, [], all_tests()}, + {single_node, [], memory_tests()}, {unclustered, [], [ {cluster_size_2, [], [add_member]} ]}, @@ -51,6 +53,7 @@ groups() -> delete_member_not_found, delete_member] ++ all_tests()}, + {cluster_size_2, [], memory_tests()}, {cluster_size_3, [], [ declare_during_node_down, simple_confirm_availability_on_leader_change, @@ -61,7 +64,8 @@ groups() -> delete_declare, metrics_cleanup_on_leadership_takeover, metrics_cleanup_on_leader_crash, - consume_in_minority]}, + consume_in_minority + ]}, {cluster_size_5, [], [start_queue, start_queue_concurrent, quorum_cluster_size_3, @@ -126,6 +130,11 @@ all_tests() -> consume_redelivery_count, subscribe_redelivery_count, message_bytes_metrics, + queue_length_limit_drop_head + ]. + +memory_tests() -> + [ memory_alarm_rolls_wal ]. @@ -240,7 +249,9 @@ declare_args(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Server), LQ = ?config(queue_name, Config), - declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), + declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-length">>, long, 2000}, + {<<"x-max-length-bytes">>, long, 2000}]), assert_queue_type(Server, LQ, quorum), DQ = <<"classic-declare-args-q">>, @@ -293,16 +304,6 @@ declare_invalid_args(Config) -> declare(rabbit_ct_client_helpers:open_channel(Config, Server), LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, {<<"x-message-ttl">>, long, 2000}])), - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-max-length">>, long, 2000}])), - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-max-length-bytes">>, long, 2000}])), ?assertExit( {{shutdown, {server_initiated_close, 406, _}}, _}, @@ -314,7 +315,7 @@ declare_invalid_args(Config) -> {{shutdown, {server_initiated_close, 406, _}}, _}, declare(rabbit_ct_client_helpers:open_channel(Config, Server), LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-overflow">>, longstr, <<"drop-head">>}])), + {<<"x-overflow">>, longstr, <<"reject-publish">>}])), ?assertExit( {{shutdown, {server_initiated_close, 406, _}}, _}, @@ -1422,7 +1423,7 @@ metrics_cleanup_on_leadership_takeover(Config) -> _ -> false end end), - force_leader_change(Leader, Servers, QQ), + force_leader_change(Servers, QQ), wait_until(fun () -> [] =:= rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes]) andalso [] =:= rpc:call(Leader, ets, lookup, [queue_metrics, QRes]) @@ -2151,6 +2152,32 @@ memory_alarm_rolls_wal(Config) -> timer:sleep(1000), ok. +queue_length_limit_drop_head(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-length">>, long, 1}])), + + RaName = ra_name(QQ), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = <<"msg1">>}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = <<"msg2">>}), + wait_for_consensus(QQ, Config), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages_total(Servers, RaName, 1), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})). + %%---------------------------------------------------------------------------- declare(Ch, Q) -> @@ -2201,6 +2228,9 @@ filter_queues(Expected, Got) -> lists:member(K, Keys) end, Got). +publish_many(Ch, Queue, Count) -> + [publish(Ch, Queue) || _ <- lists:seq(1, Count)]. + publish(Ch, Queue) -> ok = amqp_channel:cast(Ch, #'basic.publish'{routing_key = Queue}, @@ -2268,14 +2298,16 @@ wait_until(Condition, N) -> wait_until(Condition, N - 1) end. -force_leader_change(Leader, Servers, Q) -> + +force_leader_change([Server | _] = Servers, Q) -> RaName = ra_name(Q), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), [F1, _] = Servers -- [Leader], ok = rpc:call(F1, ra, trigger_election, [{RaName, F1}]), case ra:members({RaName, Leader}) of {ok, _, {_, Leader}} -> %% Leader has been re-elected - force_leader_change(Leader, Servers, Q); + force_leader_change(Servers, Q); {ok, _, _} -> %% Leader has changed ok @@ -2297,3 +2329,8 @@ get_message_bytes(Leader, QRes) -> _ -> [] end. + +wait_for_consensus(Name, Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + RaName = ra_name(Name), + {ok, _, _} = ra:members({RaName, Server}). diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl index a216c220e6..6b820c7b5c 100644 --- a/test/quorum_queue_utils.erl +++ b/test/quorum_queue_utils.erl @@ -5,6 +5,7 @@ -export([ wait_for_messages_ready/3, wait_for_messages_pending_ack/3, + wait_for_messages_total/3, dirty_query/3, ra_name/1 ]). @@ -17,6 +18,10 @@ wait_for_messages_pending_ack(Servers, QName, Ready) -> wait_for_messages(Servers, QName, Ready, fun rabbit_fifo:query_messages_checked_out/1, 60). +wait_for_messages_total(Servers, QName, Total) -> + wait_for_messages(Servers, QName, Total, + fun rabbit_fifo:query_messages_total/1, 60). + wait_for_messages(Servers, QName, Number, Fun, 0) -> Msgs = dirty_query(Servers, QName, Fun), Totals = lists:map(fun(M) when is_map(M) -> @@ -28,8 +33,8 @@ wait_for_messages(Servers, QName, Number, Fun, 0) -> wait_for_messages(Servers, QName, Number, Fun, N) -> Msgs = dirty_query(Servers, QName, Fun), ct:pal("Got messages ~p", [Msgs]), - case lists:all(fun(M) when is_map(M) -> - maps:size(M) == Number; + case lists:all(fun(C) when is_integer(C) -> + C == Number; (_) -> false end, Msgs) of diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 0512e8161a..9f1f3a4797 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -300,6 +300,7 @@ returns_after_down(Config) -> Self ! checkout_done end), receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end, + timer:sleep(1000), % message should be available for dequeue {ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2), ra:stop_server(ServerId), @@ -481,15 +482,15 @@ test_queries(Config) -> F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0), {ok, {_, Ready}, _} = ra:local_query(ServerId, - fun rabbit_fifo:query_messages_ready/1), - ?assertEqual(1, maps:size(Ready)), + fun rabbit_fifo:query_messages_ready/1), + ?assertEqual(1, Ready), ct:pal("Ready ~w~n", [Ready]), {ok, {_, Checked}, _} = ra:local_query(ServerId, - fun rabbit_fifo:query_messages_checked_out/1), - ?assertEqual(1, maps:size(Checked)), + fun rabbit_fifo:query_messages_checked_out/1), + ?assertEqual(1, Checked), ct:pal("Checked ~w~n", [Checked]), {ok, {_, Processes}, _} = ra:local_query(ServerId, - fun rabbit_fifo:query_processes/1), + fun rabbit_fifo:query_processes/1), ct:pal("Processes ~w~n", [Processes]), ?assertEqual(2, length(Processes)), P ! stop, diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index a8604b46af..5643da1991 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -25,7 +25,17 @@ all_tests() -> scenario1, scenario2, scenario3, - scenario4 + scenario4, + scenario5, + scenario6, + scenario7, + scenario8, + scenario9, + scenario10, + scenario11, + scenario12, + scenario13, + scenario14 ]. groups() -> @@ -73,7 +83,7 @@ scenario1(_Config) -> make_return(C2, [1]), %% E2 in returns E1 with C2 make_settle(C2, [2]) %% E2 with C2 ], - run_snapshot_test(?FUNCTION_NAME, Commands), + run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands), ok. scenario2(_Config) -> @@ -88,7 +98,7 @@ scenario2(_Config) -> make_settle(C1, [0]), make_settle(C2, [0]) ], - run_snapshot_test(?FUNCTION_NAME, Commands), + run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands), ok. scenario3(_Config) -> @@ -102,7 +112,7 @@ scenario3(_Config) -> make_settle(C1, [1]), make_settle(C1, [2]) ], - run_snapshot_test(?FUNCTION_NAME, Commands), + run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands), ok. scenario4(_Config) -> @@ -112,19 +122,147 @@ scenario4(_Config) -> make_enqueue(E,1,msg), make_settle(C1, [0]) ], - run_snapshot_test(?FUNCTION_NAME, Commands), + run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands), + ok. + +scenario5(_Config) -> + C1 = {<<>>, c:pid(0,505,0)}, + E = c:pid(0,465,9), + Commands = [make_enqueue(E,1,<<0>>), + make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E,2,<<>>), + make_settle(C1,[0])], + run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands), + ok. + +scenario6(_Config) -> + E = c:pid(0,465,9), + Commands = [make_enqueue(E,1,<<>>), %% 1 msg on queue - snap: prefix 1 + make_enqueue(E,2,<<>>) %% 1. msg on queue - snap: prefix 1 + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 1}, Commands), + ok. + +scenario7(_Config) -> + C1 = {<<>>, c:pid(0,208,0)}, + E = c:pid(0,188,0), + Commands = [ + make_enqueue(E,1,<<>>), + make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E,2,<<>>), + make_enqueue(E,3,<<>>), + make_settle(C1,[0])], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 1}, Commands), + ok. + +scenario8(_Config) -> + C1 = {<<>>, c:pid(0,208,0)}, + E = c:pid(0,188,0), + Commands = [ + make_enqueue(E,1,<<>>), + make_enqueue(E,2,<<>>), + make_checkout(C1, {auto,1,simple_prefetch}), + % make_checkout(C1, cancel), + {down, E, noconnection}, + make_settle(C1, [0])], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 1}, Commands), + ok. + +scenario9(_Config) -> + E = c:pid(0,188,0), + Commands = [ + make_enqueue(E,1,<<>>), + make_enqueue(E,2,<<>>), + make_enqueue(E,3,<<>>)], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 1}, Commands), + ok. + +scenario10(_Config) -> + C1 = {<<>>, c:pid(0,208,0)}, + E = c:pid(0,188,0), + Commands = [ + make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E,1,<<>>), + make_settle(C1, [0]) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 1}, Commands), + ok. + +scenario11(_Config) -> + C1 = {<<>>, c:pid(0,215,0)}, + E = c:pid(0,217,0), + Commands = [ + make_enqueue(E,1,<<>>), + make_checkout(C1, {auto,1,simple_prefetch}), + make_checkout(C1, cancel), + make_enqueue(E,2,<<>>), + make_checkout(C1, {auto,1,simple_prefetch}), + make_settle(C1, [0]), + make_checkout(C1, cancel) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 2}, Commands), + ok. + +scenario12(_Config) -> + E = c:pid(0,217,0), + Commands = [make_enqueue(E,1,<<0>>), + make_enqueue(E,2,<<0>>), + make_enqueue(E,3,<<0>>)], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_bytes => 2}, Commands), + ok. + +scenario13(_Config) -> + E = c:pid(0,217,0), + Commands = [make_enqueue(E,1,<<0>>), + make_enqueue(E,2,<<>>), + make_enqueue(E,3,<<>>), + make_enqueue(E,4,<<>>) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 2}, Commands), + ok. + +scenario14(_Config) -> + E = c:pid(0,217,0), + Commands = [make_enqueue(E,1,<<0,0>>)], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_bytes => 1}, Commands), ok. snapshots(_Config) -> run_proper( fun () -> - ?FORALL(O, ?LET(Ops, log_gen(), expand(Ops)), - test1_prop(O)) - end, [], 1000). - -test1_prop(Commands) -> - ct:pal("Commands: ~p~n", [Commands]), - try run_snapshot_test(?FUNCTION_NAME, Commands) of + ?FORALL({Length, Bytes, SingleActiveConsumer}, + frequency([{10, {0, 0, false}}, + {5, {non_neg_integer(), non_neg_integer(), + boolean()}}]), + ?FORALL(O, ?LET(Ops, log_gen(200), expand(Ops)), + collect({Length, Bytes}, + snapshots_prop( + config(?FUNCTION_NAME, + Length, Bytes, + SingleActiveConsumer), O)))) + end, [], 2000). + +config(Name, Length, Bytes, SingleActive) -> + #{name => Name, + max_length => map_max(Length), + max_bytes => map_max(Bytes), + single_active_consumer_on => SingleActive}. + +map_max(0) -> undefined; +map_max(N) -> N. + +snapshots_prop(Conf, Commands) -> + ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]), + try run_snapshot_test(Conf, Commands) of _ -> true catch Err -> @@ -132,10 +270,10 @@ test1_prop(Commands) -> false end. -log_gen() -> +log_gen(Size) -> ?LET(EPids, vector(2, pid_gen()), ?LET(CPids, vector(2, pid_gen()), - resize(200, + resize(Size, list( frequency( [{20, enqueue_gen(oneof(EPids))}, @@ -157,15 +295,17 @@ down_gen(Pid) -> ?LET(E, {down, Pid, oneof([noconnection, noproc])}, E). enqueue_gen(Pid) -> - ?LET(E, {enqueue, Pid, frequency([{10, enqueue}, - {1, delay}])}, E). + ?LET(E, {enqueue, Pid, + frequency([{10, enqueue}, + {1, delay}]), + binary()}, E). checkout_cancel_gen(Pid) -> {checkout, Pid, cancel}. checkout_gen(Pid) -> %% pid, tag, prefetch - ?LET(C, {checkout, {binary(), Pid}, choose(1, 10)}, C). + ?LET(C, {checkout, {binary(), Pid}, choose(1, 100)}, C). -record(t, {state = rabbit_fifo:init(#{name => proper, @@ -193,9 +333,10 @@ expand(Ops) -> lists:reverse(Log). -handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0, - down = Down, - effects = Effs} = T) -> +handle_op({enqueue, Pid, When, Data}, + #t{enqueuers = Enqs0, + down = Down, + effects = Effs} = T) -> case Down of #{Pid := noproc} -> %% if it's a noproc then it cannot exist - can it? @@ -204,13 +345,12 @@ handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0, _ -> Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0), MsgSeq = maps:get(Pid, Enqs), - Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, msg), + Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, Data), case When of enqueue -> do_apply(Cmd, T#t{enqueuers = Enqs}); delay -> %% just put the command on the effects queue - ct:pal("delaying ~w", [Cmd]), T#t{effects = queue:in(Cmd, Effs)} end end; @@ -308,7 +448,6 @@ enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) -> Cmd = rabbit_fifo:make_settle({CTag, P}, MsgIds), enq_effs(Rem, queue:in(Cmd, Q)); enq_effs([_ | Rem], Q) -> - % ct:pal("enq_effs dropping ~w~n", [E]), enq_effs(Rem, Q). @@ -323,29 +462,40 @@ run_proper(Fun, Args, NumTests) -> (F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) end}])). -run_snapshot_test(Name, Commands) -> +run_snapshot_test(Conf, Commands) -> %% create every incremental permuation of the commands lists %% and run the snapshot tests against that [begin % ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]), - run_snapshot_test0(Name, C) + run_snapshot_test0(Conf, C) end || C <- prefixes(Commands, 1, [])]. -run_snapshot_test0(Name, Commands) -> +run_snapshot_test0(Conf, Commands) -> Indexes = lists:seq(1, length(Commands)), Entries = lists:zip(Indexes, Commands), - {State, Effects} = run_log(test_init(Name), Entries), + {State, Effects} = run_log(test_init(Conf), Entries), + % ct:pal("beginning snapshot test run for ~w numn commands ~b", + % [maps:get(name, Conf), length(Commands)]), [begin + %% drop all entries below and including the snapshot Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true; (_) -> false end, Entries), {S, _} = run_log(SnapState, Filtered), % assert log can be restored from any release cursor index - ?assertEqual(State, S) + case S of + State -> ok; + _ -> + ct:pal("Snapshot tests failed run log:~n" + "~p~n from ~n~p~n Entries~n~p~n", + [Filtered, SnapState, Entries]), + ?assertEqual(State, S) + end end || {release_cursor, SnapIdx, SnapState} <- Effects], ok. +%% transforms [1,2,3] into [[1,2,3], [1,2], [1]] prefixes(Source, N, Acc) when N > length(Source) -> lists:reverse(Acc); prefixes(Source, N, Acc) -> @@ -364,11 +514,12 @@ run_log(InitState, Entries) -> end end, {InitState, []}, Entries). -test_init(Name) -> - rabbit_fifo:init(#{name => Name, - queue_resource => blah, - shadow_copy_interval => 0, - metrics_handler => {?MODULE, metrics_handler, []}}). +test_init(Conf) -> + Default = #{queue_resource => blah, + shadow_copy_interval => 0, + metrics_handler => {?MODULE, metrics_handler, []}}, + rabbit_fifo:init(maps:merge(Default, Conf)). + meta(Idx) -> #{index => Idx, term => 1}. diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index 0343e7d136..581440d179 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -35,9 +35,15 @@ all() -> ]. groups() -> - MaxLengthTests = [max_length_drop_head, + MaxLengthTests = [max_length_default, + max_length_bytes_default, + max_length_drop_head, + max_length_bytes_drop_head, max_length_reject_confirm, - max_length_drop_publish], + max_length_bytes_reject_confirm, + max_length_drop_publish, + max_length_drop_publish_requeue, + max_length_bytes_drop_publish], [ {parallel_tests, [parallel], [ amqp_connection_refusal, @@ -59,11 +65,16 @@ groups() -> set_disk_free_limit_command, set_vm_memory_high_watermark_command, topic_matching, + max_message_size, + {queue_max_length, [], [ - {max_length_simple, [], MaxLengthTests}, - {max_length_mirrored, [], MaxLengthTests}]}, - max_message_size - ]} + {max_length_classic, [], MaxLengthTests}, + {max_length_quorum, [], [max_length_default, + max_length_bytes_default] + }, + {max_length_mirrored, [], MaxLengthTests} + ]} + ]} ]. suite() -> @@ -82,10 +93,23 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). +init_per_group(max_length_classic, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, false}]); +init_per_group(max_length_quorum, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); init_per_group(max_length_mirrored, Config) -> rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), - Config1 = rabbit_ct_helpers:set_config(Config, [{is_mirrored, true}]), + Config1 = rabbit_ct_helpers:set_config( + Config, [{is_mirrored, true}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, false}]), rabbit_ct_helpers:run_steps(Config1, []); init_per_group(Group, Config) -> case lists:member({group, Group}, all()) of @@ -132,29 +156,22 @@ end_per_group(Group, Config) -> end. init_per_testcase(Testcase, Config) -> - rabbit_ct_helpers:testcase_started(Config, Testcase). - -end_per_testcase(max_length_drop_head = Testcase, Config) -> + Group = proplists:get_value(name, ?config(tc_group_properties, Config)), + Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])), + Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q}]), + rabbit_ct_helpers:testcase_started(Config1, Testcase). + +end_per_testcase(Testcase, Config) + when Testcase == max_length_drop_publish; Testcase == max_length_bytes_drop_publish; + Testcase == max_length_drop_publish_requeue; + Testcase == max_length_reject_confirm; Testcase == max_length_bytes_reject_confirm; + Testcase == max_length_drop_head; Testcase == max_length_bytes_drop_head; + Testcase == max_length_default; Testcase == max_length_bytes_default -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_head_queue">>}), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_default_drop_head_queue">>}), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_drop_head_queue">>}), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_default_drop_head_queue">>}), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}), rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), rabbit_ct_helpers:testcase_finished(Config, Testcase); -end_per_testcase(max_length_reject_confirm = Testcase, Config) -> - {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_reject_queue">>}), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_reject_queue">>}), - rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), - rabbit_ct_helpers:testcase_finished(Config, Testcase); -end_per_testcase(max_length_drop_publish = Testcase, Config) -> - {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_publish_queue">>}), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_drop_publish_queue">>}), - rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), - rabbit_ct_helpers:testcase_finished(Config, Testcase); end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). @@ -1159,43 +1176,66 @@ set_vm_memory_high_watermark_command1(_Config) -> ) end. -max_length_drop_head(Config) -> +max_length_bytes_drop_head(Config) -> + max_length_bytes_drop_head(Config, [{<<"x-overflow">>, longstr, <<"drop-head">>}]). + +max_length_bytes_default(Config) -> + max_length_bytes_drop_head(Config, []). + +max_length_bytes_drop_head(Config, ExtraArgs) -> {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = <<"max_length_drop_head_queue">>, - QNameDefault = <<"max_length_default_drop_head_queue">>, - QNameBytes = <<"max_length_bytes_drop_head_queue">>, - QNameDefaultBytes = <<"max_length_bytes_default_drop_head_queue">>, + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), - MaxLengthArgs = [{<<"x-max-length">>, long, 1}], MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], - OverflowArgs = [{<<"x-overflow">>, longstr, <<"drop-head">>}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefault, arguments = MaxLengthArgs}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefaultBytes, arguments = MaxLengthBytesArgs}), - - check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), - check_max_length_drops_head(Config, QNameDefault, Ch, <<"1">>, <<"2">>, <<"3">>), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthBytesArgs ++ Args ++ ExtraArgs, durable = Durable}), %% 80 bytes payload Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>, Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>, Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>, - check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3), - check_max_length_drops_head(Config, QNameDefault, Ch, Payload1, Payload2, Payload3). + check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3). + +max_length_drop_head(Config) -> + max_length_drop_head(Config, [{<<"x-overflow">>, longstr, <<"drop-head">>}]). + +max_length_default(Config) -> + %% Defaults to drop_head + max_length_drop_head(Config, []). + +max_length_drop_head(Config, ExtraArgs) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + + MaxLengthArgs = [{<<"x-max-length">>, long, 1}], + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ Args ++ ExtraArgs, durable = Durable}), + + check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>). max_length_reject_confirm(Config) -> {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = <<"max_length_reject_queue">>, - QNameBytes = <<"max_length_bytes_reject_queue">>, + Args = ?config(queue_args, Config), + QName = ?config(queue_name, Config), + Durable = ?config(queue_durable, Config), MaxLengthArgs = [{<<"x-max-length">>, long, 1}], - MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), - check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), + check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>). + +max_length_bytes_reject_confirm(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + QNameBytes = ?config(queue_name, Config), + Durable = ?config(queue_durable, Config), + MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], + OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs ++ Args, durable = Durable}), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), %% 80 bytes payload Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>, @@ -1207,15 +1247,55 @@ max_length_reject_confirm(Config) -> max_length_drop_publish(Config) -> {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = <<"max_length_drop_publish_queue">>, - QNameBytes = <<"max_length_bytes_drop_publish_queue">>, + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), MaxLengthArgs = [{<<"x-max-length">>, long, 1}], - MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}), %% If confirms are not enable, publishes will still be dropped in reject-publish mode. - check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), + check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>). + +max_length_drop_publish_requeue(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + MaxLengthArgs = [{<<"x-max-length">>, long, 1}], + OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}), + %% If confirms are not enable, publishes will still be dropped in reject-publish mode. + check_max_length_requeue(Config, QName, Ch, <<"1">>, <<"2">>). + +check_max_length_requeue(Config, QName, Ch, Payload1, Payload2) -> + sync_mirrors(QName, Config), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + %% A single message is published and consumed + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + wait_for_consensus(QName, Config), + {#'basic.get_ok'{delivery_tag = DeliveryTag}, + #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + + %% Another message is published + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + wait_for_consensus(QName, Config), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}), + wait_for_consensus(QName, Config), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). + +max_length_bytes_drop_publish(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QNameBytes = ?config(queue_name, Config), + MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], + OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs ++ Args, durable = Durable}), %% 80 bytes payload Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>, @@ -1229,22 +1309,38 @@ check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3) #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% A single message is published and consumed amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% Message 2 is dropped, message 1 stays amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + wait_for_consensus(QName, Config), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% Messages 2 and 3 are dropped, message 1 stays amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + wait_for_consensus(QName, Config), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + wait_for_consensus(QName, Config), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}), + wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). +wait_for_consensus(QName, Config) -> + case lists:keyfind(<<"x-queue-type">>, 1, ?config(queue_args, Config)) of + {_, _, <<"quorum">>} -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + RaName = binary_to_atom(<<"%2F_", QName/binary>>, utf8), + {ok, _, _} = ra:members({RaName, Server}); + _ -> + ok + end. + check_max_length_rejects(Config, QName, Ch, Payload1, Payload2, Payload3) -> sync_mirrors(QName, Config), amqp_channel:register_confirm_handler(Ch, self()), @@ -1283,12 +1379,14 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) -> #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% A single message is published and consumed amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% Message 1 is replaced by message 2 amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), @@ -1296,6 +1394,7 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) -> amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}), + wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload3}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). |
