diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-01-15 16:38:36 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-01-24 14:44:27 +0000 |
| commit | 08bded2fa243a15a9c7ced2b040066ad586104f1 (patch) | |
| tree | 5ad118b2cd3238605038a52ffcbd6a6711e2f026 | |
| parent | 267755365ab4fa2fd2568d86399806c442c7fd61 (diff) | |
| download | rabbitmq-server-git-08bded2fa243a15a9c7ced2b040066ad586104f1.tar.gz | |
Quorum queue queue length limit by byte size and number of messages
Only drop-head strategy.
This necessitated the change of rabbit_fifo prefix messages from a tuple
of integers representing the number of returned vs enqueued messages
that have already been processes and thus don't need to include message
bodes in the snapshot to a tuple of lists of the sizes of each message.
This change will have some performance impact as the snaphots will now
be larger than before but as they still won't contain message bodies at
least the sizing is fixed. Decreased the frequency as snapshots points
are prepared so somewhat make up for this.
[#161247380]
| -rw-r--r-- | src/rabbit_fifo.erl | 548 | ||||
| -rw-r--r-- | src/rabbit_fifo_index.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 16 | ||||
| -rw-r--r-- | test/dynamic_qq_SUITE.erl | 5 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 69 | ||||
| -rw-r--r-- | test/quorum_queue_utils.erl | 9 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 11 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 217 | ||||
| -rw-r--r-- | test/unit_inbroker_parallel_SUITE.erl | 207 |
9 files changed, 764 insertions, 344 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index b62ff62a51..9752e06e34 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -38,6 +38,7 @@ % queries query_messages_ready/1, query_messages_checked_out/1, + query_messages_total/1, query_processes/1, query_ra_indexes/1, query_consumer_count/1, @@ -87,8 +88,13 @@ -type msg() :: {msg_header(), raw_msg()}. %% message with a header map. +-type msg_size() :: non_neg_integer(). +%% the size in bytes of the msg payload + -type indexed_msg() :: {ra_index(), msg()}. +-type prefix_msg() :: {'$prefix_msg', msg_size()}. + -type delivery_msg() :: {msg_id(), msg()}. %% A tuple consisting of the message id and the headered message. @@ -157,7 +163,7 @@ -type applied_mfa() :: {module(), atom(), list()}. % represents a partially applied module call --define(SHADOW_COPY_INTERVAL, 4096 * 4). +-define(SHADOW_COPY_INTERVAL, 4096 * 8). -define(USE_AVG_HALF_LIFE, 10000.0). -record(consumer, @@ -189,6 +195,7 @@ suspected_down = false :: boolean() }). + -record(state, {name :: atom(), queue_resource :: rabbit_types:r('queue'), @@ -202,7 +209,8 @@ next_msg_num = 1 :: msg_in_id(), % list of returned msg_in_ids - when checking out it picks from % this list first before taking low_msg_num - returns = lqueue:new() :: lqueue:lqueue('$prefix_msg' | msg_in_id()), + returns = lqueue:new() :: lqueue:lqueue(prefix_msg() | + {msg_in_id(), indexed_msg()}), % a counter of enqueues - used to trigger shadow copy points enqueue_count = 0 :: non_neg_integer(), % a map containing all the live processes that have ever enqueued @@ -224,19 +232,20 @@ dead_letter_handler :: maybe(applied_mfa()), become_leader_handler :: maybe(applied_mfa()), %% This is a special field that is only used for snapshots - %% It represents the number of queued messages at the time the + %% It represents the queued messages at the time the %% dehydrated snapshot state was cached. %% As release_cursors are only emitted for raft indexes where all %% prior messages no longer contribute to the current state we can - %% replace all message payloads at some index with a single integer - %% to be decremented during `checkout_one' until it's 0 after which - %% it instead takes messages from the `messages' map. + %% replace all message payloads with their sizes (to be used for + %% overflow calculations). %% This is done so that consumers are still served in a deterministic %% order on recovery. - prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(), - PrefixMsgs :: non_neg_integer()}, + prefix_msgs = {[], []} :: {Return :: [msg_size()], + PrefixMsgs :: [msg_size()]}, msg_bytes_enqueue = 0 :: non_neg_integer(), msg_bytes_checkout = 0 :: non_neg_integer(), + max_length :: maybe(non_neg_integer()), + max_bytes :: maybe(non_neg_integer()), %% whether single active consumer is on or not for this queue consumer_strategy = default :: default | single_active, %% waiting consumers, one is picked active consumer is cancelled or dies @@ -251,6 +260,8 @@ dead_letter_handler => applied_mfa(), become_leader_handler => applied_mfa(), shadow_copy_interval => non_neg_integer(), + max_length => non_neg_integer(), + max_bytes => non_neg_integer(), single_active_consumer_on => boolean()}. -export_type([protocol/0, @@ -272,12 +283,14 @@ init(#{name := Name, queue_resource := Resource} = Conf) -> update_config(Conf, #state{name = Name, - queue_resource = Resource}). + queue_resource = Resource}). update_config(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), BLH = maps:get(become_leader_handler, Conf, undefined), SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL), + MaxLength = maps:get(max_length, Conf, undefined), + MaxBytes = maps:get(max_bytes, Conf, undefined), ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of true -> single_active; @@ -287,6 +300,8 @@ update_config(Conf, State) -> State#state{dead_letter_handler = DLH, become_leader_handler = BLH, shadow_copy_interval = SHI, + max_length = MaxLength, + max_bytes = MaxBytes, consumer_strategy = ConsumerStrategy}. zero(_) -> @@ -294,59 +309,49 @@ zero(_) -> % msg_ids are scoped per consumer % ra_indexes holds all raft indexes for enqueues currently on queue --spec apply(ra_machine:command_meta_data(), command(), - state()) -> - {state(), Reply :: term(), ra_machine:effects()}. -apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq, - msg = RawMsg}, State00) -> - case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State00) of - {ok, State0, Effects1} -> - %% need to checkout before capturing the shadow copy else - %% snapshots may not be complete - {State, ok, Effects} = checkout( - add_bytes_enqueue(RawMsg, State0), - Effects1), - append_to_master_index(RaftIdx, Effects, State); - {duplicate, State, Effects} -> - {State, ok, lists:reverse(Effects)} - end; -apply(#{index := RaftIdx}, +-spec apply(ra_machine:command_meta_data(), command(), state()) -> + {state(), Reply :: term(), ra_machine:effects()} | + {state(), Reply :: term()}. +apply(Metadata, #enqueue{pid = From, seq = Seq, + msg = RawMsg}, State00) -> + apply_enqueue(Metadata, From, Seq, RawMsg, State00); +apply(Meta, #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, #state{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := Con0} -> % need to increment metrics before completing as any snapshot % states taken need to includ them - complete_and_checkout(RaftIdx, MsgIds, ConsumerId, + complete_and_checkout(Meta, MsgIds, ConsumerId, Con0, [], State); _ -> {State, ok} end; -apply(#{index := RaftIdx}, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, +apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, #state{consumers = Cons0} = State0) -> case Cons0 of #{ConsumerId := Con0} -> Discarded = maps:with(MsgIds, Con0#consumer.checked_out), Effects = dead_letter_effects(Discarded, State0, []), - complete_and_checkout(RaftIdx, MsgIds, ConsumerId, Con0, + complete_and_checkout(Meta, MsgIds, ConsumerId, Con0, Effects, State0); _ -> {State0, ok} end; -apply(_, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, +apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, #state{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := Con0 = #consumer{checked_out = Checked0}} -> Checked = maps:without(MsgIds, Checked0), Returned = maps:with(MsgIds, Checked0), MsgNumMsgs = maps:values(Returned), - return(ConsumerId, MsgNumMsgs, Con0, Checked, [], State); + return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, [], State); _ -> {State, ok} end; -apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, - drain = Drain, consumer_id = ConsumerId}, +apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, + drain = Drain, consumer_id = ConsumerId}, #state{consumers = Cons0, service_queue = ServiceQueue0} = State0) -> case Cons0 of @@ -359,7 +364,7 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, ServiceQueue0), Cons = maps:put(ConsumerId, Con1, Cons0), {State1, ok, Effects} = - checkout(State0#state{service_queue = ServiceQueue, + checkout(Meta, State0#state{service_queue = ServiceQueue, consumers = Cons}, []), Response = {send_credit_reply, maps:size(State1#state.messages)}, %% by this point all checkouts for the updated credit value @@ -389,46 +394,45 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, %% credit for unknown consumer - just ignore {State0, ok} end; -apply(_, #checkout{spec = {dequeue, _}}, - #state{messages = M, - prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 -> - %% FIXME: also check if there are returned messages - %% TODO do we need metric visibility of empty get requests? - {State0, {dequeue, empty}}; -apply(Meta, #checkout{spec = {dequeue, settled}, meta = ConsumerMeta, +apply(Meta, #checkout{spec = {dequeue, Settlement}, + meta = ConsumerMeta, consumer_id = ConsumerId}, - State0) -> - % TODO: this clause could probably be optimised - State1 = update_consumer(ConsumerId, ConsumerMeta, - {once, 1, simple_prefetch}, State0), - % turn send msg effect into reply - {success, _, MsgId, Msg, State2} = checkout_one(State1), - % immediately settle - {State, _, Effects} = apply(Meta, make_settle(ConsumerId, [MsgId]), State2), - {State, {dequeue, {MsgId, Msg}}, Effects}; -apply(_, #checkout{spec = {dequeue, unsettled}, - meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, - State0) -> - State1 = update_consumer(ConsumerId, ConsumerMeta, - {once, 1, simple_prefetch}, State0), - case checkout_one(State1) of - {success, _, MsgId, Msg, S} -> - {S, {dequeue, {MsgId, Msg}}, [{monitor, process, Pid}]}; - {inactive, S} -> - {S, {dequeue, empty}, [{aux, inactive}]}; - S -> - {S, {dequeue, empty}} + #state{consumers = Consumers} = State0) -> + Exists = maps:is_key(ConsumerId, Consumers), + case messages_ready(State0) of + 0 -> + {State0, {dequeue, empty}}; + _ when Exists -> + %% a dequeue using the same consumer_id isn't possible at this point + {State0, {dequeue, empty}}; + _ -> + State1 = update_consumer(ConsumerId, ConsumerMeta, + {once, 1, simple_prefetch}, State0), + {success, _, MsgId, Msg, State2} = checkout_one(State1), + case Settlement of + unsettled -> + {_, Pid} = ConsumerId, + {State2, {dequeue, {MsgId, Msg}}, + [{monitor, process, Pid}]}; + settled -> + %% immediately settle the checkout + {State, _, Effects} = apply(Meta, + make_settle(ConsumerId, [MsgId]), + State2), + {State, {dequeue, {MsgId, Msg}}, Effects} + end end; -apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> +apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> {State, Effects} = cancel_consumer(ConsumerId, State0, []), % TODO: here we should really demonitor the pid but _only_ if it has no - % other consumers or enqueuers. - checkout(State, Effects); -apply(_, #checkout{spec = Spec, meta = Meta, - consumer_id = {_, Pid} = ConsumerId}, + % other consumers or enqueuers. leaving a monitor in place isn't harmful + % however + checkout(Meta, State, Effects); +apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, + consumer_id = {_, Pid} = ConsumerId}, State0) -> - State1 = update_consumer(ConsumerId, Meta, Spec, State0), - checkout(State1, [{monitor, process, Pid}]); + State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0), + checkout(Meta, State1, [{monitor, process, Pid}]); apply(#{index := RaftIdx}, #purge{}, #state{ra_indexes = Indexes0, messages = Messages} = State0) -> @@ -486,7 +490,7 @@ apply(_, {down, ConsumerPid, noconnection}, %% TODO: should we run a checkout here? {State#state{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers}, ok, Effects}; -apply(_, {down, Pid, _Info}, #state{consumers = Cons0, +apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages % This should be ok as we won't see any more enqueues from this pid @@ -506,8 +510,8 @@ apply(_, {down, Pid, _Info}, #state{consumers = Cons0, {State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) -> cancel_consumer(ConsumerId, S, E) end, {State2, Effects1}, DownConsumers), - checkout(State, Effects); -apply(_, {nodeup, Node}, #state{consumers = Cons0, + checkout(Meta, State, Effects); +apply(Meta, {nodeup, Node}, #state{consumers = Cons0, enqueuers = Enqs0, service_queue = SQ0} = State0) -> %% A node we are monitoring has come back. @@ -535,13 +539,13 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0, Acc end, {Cons0, SQ0, Monitors}, Cons0), % TODO: avoid list concat - checkout(State0#state{consumers = Cons1, enqueuers = Enqs1, - service_queue = SQ, - waiting_consumers = WaitingConsumers}, Effects); + checkout(Meta, State0#state{consumers = Cons1, enqueuers = Enqs1, + service_queue = SQ, + waiting_consumers = WaitingConsumers}, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; -apply(_, #update_config{config = Conf}, State) -> - {update_config(Conf, State), ok}. +apply(Meta, #update_config{config = Conf}, State) -> + checkout(Meta, update_config(Conf, State), []). handle_waiting_consumer_down(_Pid, #state{consumer_strategy = default} = State) -> @@ -592,7 +596,7 @@ state_enter(leader, #state{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers, name = Name, - prefix_msg_counts = {0, 0}, + prefix_msgs = {[], []}, become_leader_handler = BLH}) -> % return effects to monitor all current consumers and enqueuers Pids = lists:usort(maps:keys(Enqs) @@ -608,10 +612,10 @@ state_enter(leader, #state{consumers = Cons, {Mod, Fun, Args} -> [{mod_call, Mod, Fun, Args ++ [Name]} | Effects] end; -state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts}) - when PrefixMsgCounts =/= {0, 0} -> +state_enter(recovered, #state{prefix_msgs = PrefixMsgCounts}) + when PrefixMsgCounts =/= {[], []} -> %% TODO: remove assertion? - exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts}); + exit({rabbit_fifo, unexpected_prefix_msgs, PrefixMsgCounts}); state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0, waiting_consumers = WaitingConsumers0}) -> @@ -629,14 +633,12 @@ state_enter(_, _) -> -spec tick(non_neg_integer(), state()) -> ra_machine:effects(). tick(_Ts, #state{name = Name, queue_resource = QName, - messages = Messages, - ra_indexes = Indexes, msg_bytes_enqueue = EnqueueBytes, msg_bytes_checkout = CheckoutBytes} = State) -> Metrics = {Name, - maps:size(Messages), % Ready + messages_ready(State), num_checked_out(State), % checked out - rabbit_fifo_index:size(Indexes), %% Total + messages_total(State), query_consumer_count(State), % Consumers EnqueueBytes, CheckoutBytes}, @@ -649,8 +651,7 @@ overview(#state{consumers = Cons, messages = Messages, ra_indexes = Indexes, msg_bytes_enqueue = EnqueueBytes, - msg_bytes_checkout = CheckoutBytes - } = State) -> + msg_bytes_checkout = CheckoutBytes} = State) -> #{type => ?MODULE, num_consumers => maps:size(Cons), num_checked_out => num_checked_out(State), @@ -693,13 +694,17 @@ handle_aux(_, cast, Cmd, {Name, Use0}, Log, _) -> %%% Queries -query_messages_ready(#state{messages = M}) -> - M. +%% TODO: this doesn't take returns into account +query_messages_ready(State) -> + messages_ready(State). query_messages_checked_out(#state{consumers = Consumers}) -> maps:fold(fun (_, #consumer{checked_out = C}, S) -> - maps:merge(S, maps:from_list(maps:values(C))) - end, #{}, Consumers). + maps:size(C) + S + end, 0, Consumers). + +query_messages_total(State) -> + messages_total(State). query_processes(#state{enqueuers = Enqs, consumers = Cons0}) -> Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0), @@ -770,6 +775,18 @@ usage(Name) when is_atom(Name) -> %%% Internal +messages_ready(#state{messages = M, + prefix_msgs = {PreR, PreM}, + returns = R}) -> + + %% TODO: optimise to avoid length/1 call + maps:size(M) + lqueue:len(R) + length(PreR) + length(PreM). + +messages_total(#state{ra_indexes = I, + prefix_msgs = {PreR, PreM}}) -> + + rabbit_fifo_index:size(I) + length(PreR) + length(PreM). + update_use({inactive, _, _, _} = CUInfo, inactive) -> CUInfo; update_use({active, _, _} = CUInfo, active) -> @@ -879,6 +896,35 @@ cancel_consumer0(ConsumerId, {S0, Effects0} end. +apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> + Bytes = message_size(RawMsg), + case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of + {ok, State1, Effects1} -> + State2 = append_to_master_index(RaftIdx, + add_bytes_enqueue(Bytes, State1)), + {State, ok, Effects} = checkout(Meta, State2, Effects1), + {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects}; + {duplicate, State, Effects} -> + {State, ok, Effects} + end. + +drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) -> + case take_next_msg(State0) of + {FullMsg = {_MsgId, {RaftIdxToDrop, {_Header, Msg}}}, + State1} -> + Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0), + Bytes = message_size(Msg), + State = add_bytes_drop(Bytes, State1#state{ra_indexes = Indexes}), + Effects = dead_letter_effects(maps:put(none, FullMsg, #{}), + State, Effects0), + {State, Effects}; + {{'$prefix_msg', Bytes}, State1} -> + State = add_bytes_drop(Bytes, State1), + {State, Effects0}; + empty -> + {State0, Effects0} + end. + enqueue(RaftIdx, RawMsg, #state{messages = Messages, low_msg_num = LowMsgNum, next_msg_num = NextMsgNum} = State0) -> @@ -889,19 +935,28 @@ enqueue(RaftIdx, RawMsg, #state{messages = Messages, low_msg_num = min(LowMsgNum, NextMsgNum), next_msg_num = NextMsgNum + 1}. -append_to_master_index(RaftIdx, Effects, +append_to_master_index(RaftIdx, #state{ra_indexes = Indexes0} = State0) -> - {State, Shadow} = incr_enqueue_count(State0), - Indexes = rabbit_fifo_index:append(RaftIdx, Shadow, Indexes0), - {State#state{ra_indexes = Indexes}, ok, Effects}. + State = incr_enqueue_count(State0), + Indexes = rabbit_fifo_index:append(RaftIdx, undefined, Indexes0), + State#state{ra_indexes = Indexes}. incr_enqueue_count(#state{enqueue_count = C, shadow_copy_interval = C} = State0) -> - % time to stash a dehydrated state version - State = State0#state{enqueue_count = 0}, - {State, dehydrate_state(State)}; + % this will trigger a dehydrated version of the state to be stored + % at this raft index for potential future snapshot generation + State0#state{enqueue_count = 0}; incr_enqueue_count(#state{enqueue_count = C} = State) -> - {State#state{enqueue_count = C + 1}, undefined}. + State#state{enqueue_count = C + 1}. + +maybe_store_dehydrated_state(RaftIdx, #state{enqueue_count = 0, + ra_indexes = Indexes} = State) -> + Dehydrated = dehydrate_state(State), + State#state{ra_indexes = + rabbit_fifo_index:update_if_present(RaftIdx, Dehydrated, + Indexes)}; +maybe_store_dehydrated_state(_RaftIdx, State) -> + State. enqueue_pending(From, @@ -916,7 +971,8 @@ enqueue_pending(From, Enq, #state{enqueuers = Enqueuers0} = State) -> maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) -> % direct enqueue without tracking - {ok, enqueue(RaftIdx, RawMsg, State0), Effects}; + State = enqueue(RaftIdx, RawMsg, State0), + {ok, State, Effects}; maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, #state{enqueuers = Enqueuers0} = State0) -> case maps:get(From, Enqueuers0, undefined) of @@ -947,19 +1003,19 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, snd(T) -> element(2, T). -return(ConsumerId, MsgNumMsgs, Con0, Checked, +return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) -> Con = Con0#consumer{checked_out = Checked, credit = increase_credit(Con0, length(MsgNumMsgs))}, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), - State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) -> + State1 = lists:foldl(fun({'$prefix_msg', _} = Msg, S0) -> return_one(0, Msg, S0); ({MsgNum, Msg}, S0) -> return_one(MsgNum, Msg, S0) end, State0, MsgNumMsgs), - checkout(State1#state{consumers = Cons, - service_queue = SQ}, + checkout(Meta, State1#state{consumers = Cons, + service_queue = SQ}, Effects). % used to processes messages that are finished @@ -990,7 +1046,7 @@ increase_credit(#consumer{lifetime = auto, increase_credit(#consumer{credit = Current}, Credit) -> Current + Credit. -complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId, +complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, #consumer{checked_out = Checked0} = Con0, Effects0, #state{ra_indexes = Indexes0} = State0) -> Checked = maps:without(MsgIds, Checked0), @@ -998,15 +1054,15 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId, MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) -> add_bytes_settle(RawMsg, Acc); - (_, Acc) -> - Acc + ({'$prefix_msg', _} = M, Acc) -> + add_bytes_settle(M, Acc) end, State0, maps:values(Discarded)), %% need to pass the length of discarded as $prefix_msgs would be filtered %% by the above list comprehension {State2, Effects1} = complete(ConsumerId, MsgRaftIdxs, maps:size(Discarded), Con0, Checked, Effects0, State1), - {State, ok, Effects} = checkout(State2, Effects1), + {State, ok, Effects} = checkout(Meta, State2, Effects1), % settle metrics are incremented separately update_smallest_raft_index(IncomingRaftIdx, Indexes0, State, Effects). @@ -1016,7 +1072,7 @@ dead_letter_effects(_Discarded, Effects; dead_letter_effects(Discarded, #state{dead_letter_handler = {Mod, Fun, Args}}, Effects) -> - DeadLetters = maps:fold(fun(_, {_, {_, {_, Msg}}}, + DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, % MsgId, MsgIdID, RaftId, Header Acc) -> [{rejected, Msg} | Acc] end, [], Discarded), @@ -1028,7 +1084,6 @@ cancel_consumer_effects(ConsumerId, #state{queue_resource = QName}, Effects) -> update_smallest_raft_index(IncomingRaftIdx, OldIndexes, #state{ra_indexes = Indexes, - % prefix_msg_count = 0, messages = Messages} = State, Effects) -> case rabbit_fifo_index:size(Indexes) of 0 when map_size(Messages) =:= 0 -> @@ -1057,35 +1112,45 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes, end. % TODO update message then update messages and returns in single operations -return_one(0, '$prefix_msg', +return_one(0, {'$prefix_msg', _} = Msg, #state{returns = Returns} = State0) -> - State0#state{returns = lqueue:in('$prefix_msg', Returns)}; + add_bytes_return(Msg, + State0#state{returns = lqueue:in(Msg, Returns)}); return_one(MsgNum, {RaftId, {Header0, RawMsg}}, - #state{messages = Messages, - returns = Returns} = State0) -> + #state{returns = Returns} = State0) -> Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), Msg = {RaftId, {Header, RawMsg}}, % this should not affect the release cursor in any way add_bytes_return(RawMsg, - State0#state{messages = maps:put(MsgNum, Msg, Messages), - returns = lqueue:in(MsgNum, Returns)}). + State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}). -return_all(State, Checked0) -> +return_all(State0, Checked0) -> %% need to sort the list so that we return messages in the order %% they were checked out Checked = lists:sort(maps:to_list(Checked0)), - lists:foldl(fun ({_, '$prefix_msg'}, S) -> - return_one(0, '$prefix_msg', S); + lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, S) -> + return_one(0, Msg, S); ({_, {MsgNum, Msg}}, S) -> return_one(MsgNum, Msg, S) - end, State, Checked). + end, State0, Checked). + + %% checkout new messages to consumers %% reverses the effects list -checkout(State, Effects) -> - checkout0(checkout_one(State), Effects, #{}). +checkout(#{index := Index}, State0, Effects0) -> + {State1, _Result, Effects1} = checkout0(checkout_one(State0), + Effects0, #{}), + case evaluate_limit(State0#state.ra_indexes, false, + State1, Effects1) of + {State, true, Effects} -> + update_smallest_raft_index(Index, State0#state.ra_indexes, + State, Effects); + {State, false, Effects} -> + {State, ok, Effects} + end. checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) -> DelMsg = {MsgId, Msg}, @@ -1093,12 +1158,31 @@ checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) -> fun (M) -> [DelMsg | M] end, [DelMsg], Acc0), checkout0(checkout_one(State), Effects, Acc); -checkout0({inactive, State}, Effects0, Acc) -> - Effects = append_send_msg_effects(Effects0, Acc), - {State, ok, lists:reverse([{aux, inactive} | Effects])}; -checkout0(State, Effects0, Acc) -> - Effects = append_send_msg_effects(Effects0, Acc), - {State, ok, lists:reverse(Effects)}. +checkout0({Activity, State0}, Effects0, Acc) -> + Effects1 = case Activity of + nochange -> + append_send_msg_effects(Effects0, Acc); + inactive -> + [{aux, inactive} + | append_send_msg_effects(Effects0, Acc)] + end, + {State0, ok, lists:reverse(Effects1)}. + +evaluate_limit(_OldIndexes, Result, + #state{max_length = undefined, + max_bytes = undefined} = State, + Effects) -> + {State, Result, Effects}; +evaluate_limit(OldIndexes, Result, + State0, Effects0) -> + case is_over_limit(State0) of + true -> + {State, Effects} = drop_head(State0, Effects0), + evaluate_limit(OldIndexes, true, State, Effects); + false -> + {State0, Result, Effects0} + %% TODO: optimisation: avoid calling if nothing has been dropped? + end. append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 -> Effects; @@ -1108,57 +1192,51 @@ append_send_msg_effects(Effects0, AccMap) -> end, Effects0, AccMap), [{aux, active} | Effects]. -next_checkout_message(#state{prefix_msg_counts = {PReturns, P}} = State) - when PReturns > 0 -> +%% next message is determined as follows: +%% First we check if there are are prefex returns +%% Then we check if there are current returns +%% then we check prefix msgs +%% then we check current messages +%% +%% When we return it is always done to the current return queue +%% for both prefix messages and current messages +take_next_msg(#state{prefix_msgs = {[Bytes | Rem], P}} = State) -> %% there are prefix returns, these should be served first - {'$prefix_msg', State#state{prefix_msg_counts = {PReturns - 1, P}}}; -next_checkout_message(#state{returns = Returns, - low_msg_num = Low0, - prefix_msg_counts = {R, P}, - next_msg_num = NextMsgNum} = State) -> + {{'$prefix_msg', Bytes}, + State#state{prefix_msgs = {Rem, P}}}; +take_next_msg(#state{returns = Returns, + low_msg_num = Low0, + messages = Messages0, + prefix_msgs = {R, P}} = State) -> %% use peek rather than out there as the most likely case is an empty %% queue case lqueue:peek(Returns) of - {value, Next} -> - {Next, State#state{returns = lqueue:drop(Returns)}}; - empty when P == 0 -> + {value, NextMsg} -> + {NextMsg, + State#state{returns = lqueue:drop(Returns)}}; + empty when P == [] -> case Low0 of undefined -> - {undefined, State}; + empty; _ -> - case Low0 + 1 of - NextMsgNum -> - %% the map will be empty after this item is removed - {Low0, State#state{low_msg_num = undefined}}; - Low -> - {Low0, State#state{low_msg_num = Low}} + %% TODO: defensive? + {Msg, Messages} = maps:take(Low0, Messages0), + case maps:size(Messages) of + 0 -> + {{Low0, Msg}, + State#state{messages = Messages, + low_msg_num = undefined}}; + _ -> + {{Low0, Msg}, + State#state{messages = Messages, + low_msg_num = Low0 + 1}} end end; empty -> + [Bytes | Rem] = P, %% There are prefix msgs - {'$prefix_msg', State#state{prefix_msg_counts = {R, P - 1}}} - end. - -%% next message is determined as follows: -%% First we check if there are are prefex returns -%% Then we check if there are current returns -%% then we check prefix msgs -%% then we check current messages -%% -%% When we return it is always done to the current return queue -%% for both prefix messages and current messages -take_next_msg(#state{messages = Messages0} = State0) -> - case next_checkout_message(State0) of - {'$prefix_msg', State} -> - {'$prefix_msg', State, Messages0}; - {NextMsgInId, State} -> - %% messages are available - case maps:take(NextMsgInId, Messages0) of - {IdxMsg, Messages} -> - {{NextMsgInId, IdxMsg}, State, Messages}; - error -> - error - end + {{'$prefix_msg', Bytes}, + State#state{prefix_msgs = {R, Rem}}} end. send_msg_effect({CTag, CPid}, Msgs) -> @@ -1170,7 +1248,7 @@ checkout_one(#state{service_queue = SQ0, case queue:peek(SQ0) of {value, ConsumerId} -> case take_next_msg(InitState) of - {ConsumerMsg, State0, Messages} -> + {ConsumerMsg, State0} -> SQ1 = queue:drop(SQ0), %% there are consumers waiting to be serviced %% process consumer checkout @@ -1195,31 +1273,31 @@ checkout_one(#state{service_queue = SQ0, update_or_remove_sub(ConsumerId, Con, Cons0, SQ1, []), State1 = State0#state{service_queue = SQ, - messages = Messages, consumers = Cons}, {State, Msg} = case ConsumerMsg of - '$prefix_msg' -> - {State1, '$prefix_msg'}; + {'$prefix_msg', _} -> + {add_bytes_checkout(ConsumerMsg, State1), + ConsumerMsg}; {_, {_, {_, RawMsg} = M}} -> - {add_bytes_checkout(RawMsg, State1), M} + {add_bytes_checkout(RawMsg, State1), + M} end, {success, ConsumerId, Next, Msg, State}; error -> %% consumer did not exist but was queued, recurse checkout_one(InitState#state{service_queue = SQ1}) end; - error -> - InitState + empty -> + {nochange, InitState} end; empty -> case maps:size(Messages0) of - 0 -> InitState; + 0 -> {nochange, InitState}; _ -> {inactive, InitState} end end. - update_or_remove_sub(ConsumerId, #consumer{lifetime = auto, credit = 0} = Con, Cons, ServiceQueue, Effects) -> @@ -1262,14 +1340,14 @@ update_consumer(ConsumerId, Meta, Spec, %% general case, single active consumer off update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, Spec, - #state{consumers = Cons0, + #state{consumers = Cons0, consumer_strategy = single_active} = State0) when map_size(Cons0) == 0 -> %% single active consumer on, no one is consuming yet update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, - #state{consumer_strategy = single_active, - waiting_consumers = WaitingConsumers0} = State0) -> + #state{consumer_strategy = single_active, + waiting_consumers = WaitingConsumers0} = State0) -> %% single active consumer on and one active consumer already %% adding the new consumer to the waiting list Consumer = #consumer{lifetime = Life, meta = Meta, @@ -1289,11 +1367,10 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, %% the credit update N = maps:size(S#consumer.checked_out), C = max(0, Credit - N), - S#consumer{lifetime = Life, - credit = C} + S#consumer{lifetime = Life, credit = C} end, Init, Cons0), ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons), - ServiceQueue0), + ServiceQueue0), State0#state{consumers = Cons, service_queue = ServiceQueue}. @@ -1313,13 +1390,20 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, dehydrate_state(#state{messages = Messages, consumers = Consumers, returns = Returns, - prefix_msg_counts = {PrefRetCnt, MsgCount}} = State) -> - %% TODO: optimise to avoid having to iterate the queue to get the number - %% of current returned messages - RetLen = lqueue:len(Returns), % O(1) - CurReturns = length([R || R <- lqueue:to_list(Returns), - R =/= '$prefix_msg']), - PrefixMsgCnt = MsgCount + maps:size(Messages) - CurReturns, + prefix_msgs = {PrefRet0, PrefMsg0}} = State) -> + %% TODO: optimise this function as far as possible + PrefRet = lists:foldl(fun ({'$prefix_msg', Bytes}, Acc) -> + [Bytes | Acc]; + ({_, {_, {_, Raw}}}, Acc) -> + [message_size(Raw) | Acc] + end, + lists:reverse(PrefRet0), + lqueue:to_list(Returns)), + PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_H, Raw}}}, Acc) -> + [message_size(Raw) | Acc] + end, + lists:reverse(PrefMsg0), + lists:sort(maps:to_list(Messages))), State#state{messages = #{}, ra_indexes = rabbit_fifo_index:empty(), low_msg_num = undefined, @@ -1327,14 +1411,26 @@ dehydrate_state(#state{messages = Messages, dehydrate_consumer(C) end, Consumers), returns = lqueue:new(), - %% messages include returns - prefix_msg_counts = {RetLen + PrefRetCnt, - PrefixMsgCnt}}. + prefix_msgs = {lists:reverse(PrefRet), + lists:reverse(PrefMsgs)}}. dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> - Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0), + Checked = maps:map(fun (_, {'$prefix_msg', _} = M) -> + M; + (_, {_, {_, {_, Raw}}}) -> + {'$prefix_msg', message_size(Raw)} + end, Checked0), Con#consumer{checked_out = Checked}. +is_over_limit(#state{max_length = undefined, + max_bytes = undefined}) -> + false; +is_over_limit(#state{max_length = MaxLength, + max_bytes = MaxBytes, + msg_bytes_enqueue = BytesEnq} = State) -> + + messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes). + -spec make_enqueue(maybe(pid()), maybe(msg_seqno()), raw_msg()) -> protocol(). make_enqueue(Pid, Seq, Msg) -> #enqueue{pid = Pid, seq = Seq, msg = Msg}. @@ -1371,10 +1467,12 @@ make_purge() -> #purge{}. make_update_config(Config) -> #update_config{config = Config}. -add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) -> - Bytes = message_size(Msg), +add_bytes_enqueue(Bytes, #state{msg_bytes_enqueue = Enqueue} = State) -> State#state{msg_bytes_enqueue = Enqueue + Bytes}. +add_bytes_drop(Bytes, #state{msg_bytes_enqueue = Enqueue} = State) -> + State#state{msg_bytes_enqueue = Enqueue - Bytes}. + add_bytes_checkout(Msg, #state{msg_bytes_checkout = Checkout, msg_bytes_enqueue = Enqueue } = State) -> Bytes = message_size(Msg), @@ -1394,6 +1492,8 @@ add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout, message_size(#basic_message{content = Content}) -> #content{payload_fragments_rev = PFR} = Content, iolist_size(PFR); +message_size({'$prefix_msg', B}) -> + B; message_size(B) when is_binary(B) -> byte_size(B); message_size(Msg) -> @@ -1554,7 +1654,7 @@ enq_enq_deq_deq_settle_test() -> {State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} = apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}), State2), - {_State4, {dequeue, empty}, _} = + {_State4, {dequeue, empty}} = apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), State3), ok. @@ -1599,7 +1699,7 @@ release_cursor_test() -> checkout_enq_settle_test() -> Cid = {?FUNCTION_NAME, self()}, - {State1, [{monitor, _, _}]} = check(Cid, 1, test_init(test)), + {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)), {State2, Effects0} = enq(2, 1, first, State1), ?ASSERT_EFF({send_msg, _, {delivery, ?FUNCTION_NAME, @@ -1614,7 +1714,7 @@ checkout_enq_settle_test() -> out_of_order_enqueue_test() -> Cid = {?FUNCTION_NAME, self()}, - {State1, [{monitor, _, _}]} = check_n(Cid, 5, 5, test_init(test)), + {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), {State2, Effects2} = enq(2, 1, first, State1), ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), % assert monitor was set up @@ -1644,7 +1744,7 @@ out_of_order_first_enqueue_test() -> duplicate_enqueue_test() -> Cid = {<<"duplicate_enqueue_test">>, self()}, - {State1, [{monitor, _, _}]} = check_n(Cid, 5, 5, test_init(test)), + {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), {State2, Effects2} = enq(2, 1, first, State1), ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), {_State3, Effects3} = enq(3, 1, first, State2), @@ -1663,10 +1763,10 @@ return_checked_out_test() -> {State0, [_, _]} = enq(1, 1, first, test_init(test)), {State1, [_Monitor, {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event}, - {aux, active} + {aux, active} | _ ]} = check(Cid, 2, State0), % return - {_State2, _, [_, _]} = apply(meta(3), make_return(Cid, [MsgId]), State1), + {_State2, _, [_]} = apply(meta(3), make_return(Cid, [MsgId]), State1), ok. return_auto_checked_out_test() -> @@ -1695,7 +1795,8 @@ cancelled_checkout_out_test() -> {State1, _} = check_auto(Cid, 2, State0), % cancelled checkout should return all pending messages to queue {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), State1), - ?assertEqual(2, maps:size(State2#state.messages)), + ?assertEqual(1, maps:size(State2#state.messages)), + ?assertEqual(1, lqueue:len(State2#state.returns)), {State3, {dequeue, {0, {_, first}}}, _} = apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2), @@ -1748,14 +1849,14 @@ down_with_noconnection_returns_unack_test() -> ?assertEqual(0, maps:size(State1#state.messages)), ?assertEqual(0, lqueue:len(State1#state.returns)), {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), - ?assertEqual(1, maps:size(State2a#state.messages)), + ?assertEqual(0, maps:size(State2a#state.messages)), ?assertEqual(1, lqueue:len(State2a#state.returns)), ok. down_with_noproc_enqueuer_is_cleaned_up_test() -> State00 = test_init(test), Pid = spawn(fun() -> ok end), - {State0, _, Effects0} = apply(meta(1), {enqueue, Pid, 1, first}, State00), + {State0, _, Effects0} = apply(meta(1), make_enqueue(Pid, 1, first), State00), ?ASSERT_EFF({monitor, process, _}, Effects0), {State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0), % ensure there are no enqueuers @@ -2116,7 +2217,7 @@ single_active_consumer_test() -> % adding some consumers AddConsumer = fun(CTag, State) -> {NewState, _, _} = apply( - #{}, + meta(1), #checkout{spec = {once, 1, simple_prefetch}, meta = #{}, consumer_id = {CTag, self()}}, @@ -2134,7 +2235,8 @@ single_active_consumer_test() -> ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#state.waiting_consumers)), % cancelling a waiting consumer - {State2, _, Effects1} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1), + {State2, _, Effects1} = apply(meta(2), + #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1), % the active consumer should still be in place ?assertEqual(1, map_size(State2#state.consumers)), ?assert(maps:is_key({<<"ctag1">>, self()}, State2#state.consumers)), @@ -2146,7 +2248,7 @@ single_active_consumer_test() -> ?assertEqual(1, length(Effects1)), % cancelling the active consumer - {State3, _, Effects2} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2), + {State3, _, Effects2} = apply(meta(3), #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2), % the second registered consumer is now the active one ?assertEqual(1, map_size(State3#state.consumers)), ?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)), @@ -2157,7 +2259,7 @@ single_active_consumer_test() -> ?assertEqual(2, length(Effects2)), % cancelling the active consumer - {State4, _, Effects3} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3), + {State4, _, Effects3} = apply(meta(4), #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3), % the last waiting consumer became the active one ?assertEqual(1, map_size(State4#state.consumers)), ?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)), @@ -2167,7 +2269,7 @@ single_active_consumer_test() -> ?assertEqual(2, length(Effects3)), % cancelling the last consumer - {State5, _, Effects4} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4), + {State5, _, Effects4} = apply(meta(5), #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4), % no active consumer anymore ?assertEqual(0, map_size(State5#state.consumers)), % still nothing in the waiting list @@ -2192,7 +2294,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() -> % adding some consumers AddConsumer = fun({CTag, ChannelId}, State) -> {NewState, _, _} = apply( - #{}, + #{index => 1}, #checkout{spec = {once, 1, simple_prefetch}, meta = #{}, consumer_id = {CTag, ChannelId}}, @@ -2203,16 +2305,17 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() -> [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), % the channel of the active consumer goes down - {State2, _, Effects} = apply(#{}, {down, Pid1, doesnotmatter}, State1), + {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, doesnotmatter}, State1), % fell back to another consumer ?assertEqual(1, map_size(State2#state.consumers)), % there are still waiting consumers ?assertEqual(2, length(State2#state.waiting_consumers)), - % effects to unregister the consumer and to update the new active one (metrics) are there + % effects to unregister the consumer and + % to update the new active one (metrics) are there ?assertEqual(2, length(Effects)), % the channel of the active consumer and a waiting consumer goes down - {State3, _, Effects2} = apply(#{}, {down, Pid2, doesnotmatter}, State2), + {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, doesnotmatter}, State2), % fell back to another consumer ?assertEqual(1, map_size(State3#state.consumers)), % no more waiting consumer @@ -2221,7 +2324,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() -> ?assertEqual(3, length(Effects2)), % the last channel goes down - {State4, _, Effects3} = apply(#{}, {down, Pid3, doesnotmatter}, State3), + {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3), % no more consumers ?assertEqual(0, map_size(State4#state.consumers)), ?assertEqual(0, length(State4#state.waiting_consumers)), @@ -2237,10 +2340,11 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti shadow_copy_interval => 0, single_active_consumer_on => true}), + Meta = #{index => 1}, % adding some consumers AddConsumer = fun(CTag, State) -> {NewState, _, _} = apply( - #{}, + Meta, #checkout{spec = {once, 1, simple_prefetch}, meta = #{}, consumer_id = {CTag, self()}}, @@ -2259,7 +2363,7 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti end, State2#state.waiting_consumers), % simulate node goes back up - {State3, _, _} = apply(#{}, {nodeup, node(self())}, State2), + {State3, _, _} = apply(#{index => 2}, {nodeup, node(self())}, State2), % all the waiting consumers should be un-suspected ?assertEqual(3, length(State3#state.waiting_consumers)), @@ -2281,10 +2385,11 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test() -> Pid2 = spawn(DummyFunction), Pid3 = spawn(DummyFunction), + Meta = #{index => 1}, % adding some consumers AddConsumer = fun({CTag, ChannelId}, State) -> {NewState, _, _} = apply( - #{}, + Meta, #checkout{spec = {once, 1, simple_prefetch}, meta = #{}, consumer_id = {CTag, ChannelId}}, @@ -2310,10 +2415,11 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test() -> Pid2 = spawn(DummyFunction), Pid3 = spawn(DummyFunction), + Meta = #{index => 1}, % adding some consumers AddConsumer = fun({CTag, ChannelId}, State) -> {NewState, _, _} = apply( - #{}, + Meta, #checkout{spec = {once, 1, simple_prefetch}, meta = #{}, consumer_id = {CTag, ChannelId}}, @@ -2333,11 +2439,11 @@ query_consumers_test() -> atom_to_binary(?FUNCTION_NAME, utf8)), shadow_copy_interval => 0, single_active_consumer_on => true}), - + Meta = #{index => 1}, % adding some consumers AddConsumer = fun(CTag, State) -> {NewState, _, _} = apply( - #{}, + Meta, #checkout{spec = {once, 1, simple_prefetch}, meta = #{}, consumer_id = {CTag, self()}}, diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl index f8f414f453..184002611e 100644 --- a/src/rabbit_fifo_index.erl +++ b/src/rabbit_fifo_index.erl @@ -4,12 +4,14 @@ empty/0, fetch/2, append/3, + update_if_present/3, return/3, delete/2, size/1, smallest/1, next_key_after/2, - map/2 + map/2, + to_map/1 ]). -include_lib("ra/include/ra.hrl"). @@ -36,12 +38,22 @@ fetch(Key, #?MODULE{data = Data}) -> -spec append(integer(), term(), state()) -> state(). append(Key, Value, #?MODULE{data = Data, - smallest = Smallest, - largest = Largest} = State) + smallest = Smallest, + largest = Largest} = State) when Key > Largest orelse Largest =:= undefined -> State#?MODULE{data = maps:put(Key, Value, Data), - smallest = ra_lib:default(Smallest, Key), - largest = Key}. + smallest = ra_lib:default(Smallest, Key), + largest = Key}. + +-spec update_if_present(integer(), term(), state()) -> state(). +update_if_present(Key, Value, #?MODULE{data = Data} = State) -> + case Data of + #{Key := _} -> + State#?MODULE{data = maps:put(Key, Value, Data)}; + _ -> + State + end. + -spec return(integer(), term(), state()) -> state(). return(Key, Value, #?MODULE{data = Data, smallest = Smallest} = State) @@ -76,6 +88,10 @@ delete(Key, #?MODULE{data = Data} = State) -> size(#?MODULE{data = Data}) -> maps:size(Data). +-spec to_map(state()) -> #{integer() => term()}. +to_map(#?MODULE{data = Data}) -> + Data. + -spec smallest(state()) -> undefined | {integer(), term()}. smallest(#?MODULE{smallest = undefined}) -> undefined; diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 5c0d1c0070..1f3aa1f38b 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -152,10 +152,15 @@ ra_machine(Q) -> ra_machine_config(Q = #amqqueue{name = QName, pid = {Name, _}}) -> + %% take the minimum value of the policy and the queue arg if present + MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q), + MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), #{name => Name, queue_resource => QName, dead_letter_handler => dlx_mfa(Q), become_leader_handler => {?MODULE, become_leader, [QName]}, + max_length => MaxLength, + max_bytes => MaxBytes, single_active_consumer_on => single_active_consumer_on(Q)}. single_active_consumer_on(#amqqueue{arguments = QArguments}) -> @@ -636,8 +641,10 @@ delete_member(#amqqueue{pid = {RaName, _}, name = QName}, Node) -> %%---------------------------------------------------------------------------- dlx_mfa(Q) -> - DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, fun res_arg/2, Q), Q), - DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>, fun res_arg/2, Q), + DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, + fun res_arg/2, Q), Q), + DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>, + fun res_arg/2, Q), {?MODULE, dead_letter_publish, [DLX, DLXRKey, Q#amqqueue.name]}. init_dlx(undefined, _Q) -> @@ -829,9 +836,8 @@ qnode({_, Node}) -> Node. check_invalid_arguments(QueueName, Args) -> - Keys = [<<"x-expires">>, <<"x-message-ttl">>, <<"x-max-length">>, - <<"x-max-length-bytes">>, <<"x-max-priority">>, <<"x-overflow">>, - <<"x-queue-mode">>], + Keys = [<<"x-expires">>, <<"x-message-ttl">>, + <<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>], [case rabbit_misc:table_lookup(Args, Key) of undefined -> ok; _TypeVal -> rabbit_misc:protocol_error( diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl index d1158ef07a..3357e6f74b 100644 --- a/test/dynamic_qq_SUITE.erl +++ b/test/dynamic_qq_SUITE.erl @@ -135,9 +135,8 @@ force_delete_if_no_consensus(Config) -> passive = true})), %% TODO implement a force delete BCh2 = rabbit_ct_client_helpers:open_channel(Config, B), - ?assertExit({{shutdown, - {connection_closing, {server_initiated_close, 541, _}}}, _}, - amqp_channel:call(BCh2, #'queue.delete'{queue = QName})), + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(BCh2, #'queue.delete'{queue = QName})), ok. takeover_on_failure(Config) -> diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index dcba910a6a..48dac3ca57 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -22,6 +22,7 @@ -import(quorum_queue_utils, [wait_for_messages_ready/3, wait_for_messages_pending_ack/3, + wait_for_messages_total/3, dirty_query/3, ra_name/1]). @@ -37,6 +38,7 @@ all() -> groups() -> [ {single_node, [], all_tests()}, + {single_node, [], memory_tests()}, {unclustered, [], [ {cluster_size_2, [], [add_member]} ]}, @@ -51,6 +53,7 @@ groups() -> delete_member_not_found, delete_member] ++ all_tests()}, + {cluster_size_2, [], memory_tests()}, {cluster_size_3, [], [ declare_during_node_down, simple_confirm_availability_on_leader_change, @@ -61,7 +64,8 @@ groups() -> delete_declare, metrics_cleanup_on_leadership_takeover, metrics_cleanup_on_leader_crash, - consume_in_minority]}, + consume_in_minority + ]}, {cluster_size_5, [], [start_queue, start_queue_concurrent, quorum_cluster_size_3, @@ -126,6 +130,11 @@ all_tests() -> consume_redelivery_count, subscribe_redelivery_count, message_bytes_metrics, + queue_length_limit_drop_head + ]. + +memory_tests() -> + [ memory_alarm_rolls_wal ]. @@ -240,7 +249,9 @@ declare_args(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Server), LQ = ?config(queue_name, Config), - declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), + declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-length">>, long, 2000}, + {<<"x-max-length-bytes">>, long, 2000}]), assert_queue_type(Server, LQ, quorum), DQ = <<"classic-declare-args-q">>, @@ -293,16 +304,6 @@ declare_invalid_args(Config) -> declare(rabbit_ct_client_helpers:open_channel(Config, Server), LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, {<<"x-message-ttl">>, long, 2000}])), - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-max-length">>, long, 2000}])), - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-max-length-bytes">>, long, 2000}])), ?assertExit( {{shutdown, {server_initiated_close, 406, _}}, _}, @@ -314,7 +315,7 @@ declare_invalid_args(Config) -> {{shutdown, {server_initiated_close, 406, _}}, _}, declare(rabbit_ct_client_helpers:open_channel(Config, Server), LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-overflow">>, longstr, <<"drop-head">>}])), + {<<"x-overflow">>, longstr, <<"reject-publish">>}])), ?assertExit( {{shutdown, {server_initiated_close, 406, _}}, _}, @@ -1422,7 +1423,7 @@ metrics_cleanup_on_leadership_takeover(Config) -> _ -> false end end), - force_leader_change(Leader, Servers, QQ), + force_leader_change(Servers, QQ), wait_until(fun () -> [] =:= rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes]) andalso [] =:= rpc:call(Leader, ets, lookup, [queue_metrics, QRes]) @@ -2151,6 +2152,32 @@ memory_alarm_rolls_wal(Config) -> timer:sleep(1000), ok. +queue_length_limit_drop_head(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-length">>, long, 1}])), + + RaName = ra_name(QQ), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = <<"msg1">>}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = <<"msg2">>}), + wait_for_consensus(QQ, Config), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages_total(Servers, RaName, 1), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})). + %%---------------------------------------------------------------------------- declare(Ch, Q) -> @@ -2201,6 +2228,9 @@ filter_queues(Expected, Got) -> lists:member(K, Keys) end, Got). +publish_many(Ch, Queue, Count) -> + [publish(Ch, Queue) || _ <- lists:seq(1, Count)]. + publish(Ch, Queue) -> ok = amqp_channel:cast(Ch, #'basic.publish'{routing_key = Queue}, @@ -2268,14 +2298,16 @@ wait_until(Condition, N) -> wait_until(Condition, N - 1) end. -force_leader_change(Leader, Servers, Q) -> + +force_leader_change([Server | _] = Servers, Q) -> RaName = ra_name(Q), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), [F1, _] = Servers -- [Leader], ok = rpc:call(F1, ra, trigger_election, [{RaName, F1}]), case ra:members({RaName, Leader}) of {ok, _, {_, Leader}} -> %% Leader has been re-elected - force_leader_change(Leader, Servers, Q); + force_leader_change(Servers, Q); {ok, _, _} -> %% Leader has changed ok @@ -2297,3 +2329,8 @@ get_message_bytes(Leader, QRes) -> _ -> [] end. + +wait_for_consensus(Name, Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + RaName = ra_name(Name), + {ok, _, _} = ra:members({RaName, Server}). diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl index a216c220e6..6b820c7b5c 100644 --- a/test/quorum_queue_utils.erl +++ b/test/quorum_queue_utils.erl @@ -5,6 +5,7 @@ -export([ wait_for_messages_ready/3, wait_for_messages_pending_ack/3, + wait_for_messages_total/3, dirty_query/3, ra_name/1 ]). @@ -17,6 +18,10 @@ wait_for_messages_pending_ack(Servers, QName, Ready) -> wait_for_messages(Servers, QName, Ready, fun rabbit_fifo:query_messages_checked_out/1, 60). +wait_for_messages_total(Servers, QName, Total) -> + wait_for_messages(Servers, QName, Total, + fun rabbit_fifo:query_messages_total/1, 60). + wait_for_messages(Servers, QName, Number, Fun, 0) -> Msgs = dirty_query(Servers, QName, Fun), Totals = lists:map(fun(M) when is_map(M) -> @@ -28,8 +33,8 @@ wait_for_messages(Servers, QName, Number, Fun, 0) -> wait_for_messages(Servers, QName, Number, Fun, N) -> Msgs = dirty_query(Servers, QName, Fun), ct:pal("Got messages ~p", [Msgs]), - case lists:all(fun(M) when is_map(M) -> - maps:size(M) == Number; + case lists:all(fun(C) when is_integer(C) -> + C == Number; (_) -> false end, Msgs) of diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 0512e8161a..9f1f3a4797 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -300,6 +300,7 @@ returns_after_down(Config) -> Self ! checkout_done end), receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end, + timer:sleep(1000), % message should be available for dequeue {ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2), ra:stop_server(ServerId), @@ -481,15 +482,15 @@ test_queries(Config) -> F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0), {ok, {_, Ready}, _} = ra:local_query(ServerId, - fun rabbit_fifo:query_messages_ready/1), - ?assertEqual(1, maps:size(Ready)), + fun rabbit_fifo:query_messages_ready/1), + ?assertEqual(1, Ready), ct:pal("Ready ~w~n", [Ready]), {ok, {_, Checked}, _} = ra:local_query(ServerId, - fun rabbit_fifo:query_messages_checked_out/1), - ?assertEqual(1, maps:size(Checked)), + fun rabbit_fifo:query_messages_checked_out/1), + ?assertEqual(1, Checked), ct:pal("Checked ~w~n", [Checked]), {ok, {_, Processes}, _} = ra:local_query(ServerId, - fun rabbit_fifo:query_processes/1), + fun rabbit_fifo:query_processes/1), ct:pal("Processes ~w~n", [Processes]), ?assertEqual(2, length(Processes)), P ! stop, diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index a8604b46af..5643da1991 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -25,7 +25,17 @@ all_tests() -> scenario1, scenario2, scenario3, - scenario4 + scenario4, + scenario5, + scenario6, + scenario7, + scenario8, + scenario9, + scenario10, + scenario11, + scenario12, + scenario13, + scenario14 ]. groups() -> @@ -73,7 +83,7 @@ scenario1(_Config) -> make_return(C2, [1]), %% E2 in returns E1 with C2 make_settle(C2, [2]) %% E2 with C2 ], - run_snapshot_test(?FUNCTION_NAME, Commands), + run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands), ok. scenario2(_Config) -> @@ -88,7 +98,7 @@ scenario2(_Config) -> make_settle(C1, [0]), make_settle(C2, [0]) ], - run_snapshot_test(?FUNCTION_NAME, Commands), + run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands), ok. scenario3(_Config) -> @@ -102,7 +112,7 @@ scenario3(_Config) -> make_settle(C1, [1]), make_settle(C1, [2]) ], - run_snapshot_test(?FUNCTION_NAME, Commands), + run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands), ok. scenario4(_Config) -> @@ -112,19 +122,147 @@ scenario4(_Config) -> make_enqueue(E,1,msg), make_settle(C1, [0]) ], - run_snapshot_test(?FUNCTION_NAME, Commands), + run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands), + ok. + +scenario5(_Config) -> + C1 = {<<>>, c:pid(0,505,0)}, + E = c:pid(0,465,9), + Commands = [make_enqueue(E,1,<<0>>), + make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E,2,<<>>), + make_settle(C1,[0])], + run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands), + ok. + +scenario6(_Config) -> + E = c:pid(0,465,9), + Commands = [make_enqueue(E,1,<<>>), %% 1 msg on queue - snap: prefix 1 + make_enqueue(E,2,<<>>) %% 1. msg on queue - snap: prefix 1 + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 1}, Commands), + ok. + +scenario7(_Config) -> + C1 = {<<>>, c:pid(0,208,0)}, + E = c:pid(0,188,0), + Commands = [ + make_enqueue(E,1,<<>>), + make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E,2,<<>>), + make_enqueue(E,3,<<>>), + make_settle(C1,[0])], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 1}, Commands), + ok. + +scenario8(_Config) -> + C1 = {<<>>, c:pid(0,208,0)}, + E = c:pid(0,188,0), + Commands = [ + make_enqueue(E,1,<<>>), + make_enqueue(E,2,<<>>), + make_checkout(C1, {auto,1,simple_prefetch}), + % make_checkout(C1, cancel), + {down, E, noconnection}, + make_settle(C1, [0])], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 1}, Commands), + ok. + +scenario9(_Config) -> + E = c:pid(0,188,0), + Commands = [ + make_enqueue(E,1,<<>>), + make_enqueue(E,2,<<>>), + make_enqueue(E,3,<<>>)], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 1}, Commands), + ok. + +scenario10(_Config) -> + C1 = {<<>>, c:pid(0,208,0)}, + E = c:pid(0,188,0), + Commands = [ + make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E,1,<<>>), + make_settle(C1, [0]) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 1}, Commands), + ok. + +scenario11(_Config) -> + C1 = {<<>>, c:pid(0,215,0)}, + E = c:pid(0,217,0), + Commands = [ + make_enqueue(E,1,<<>>), + make_checkout(C1, {auto,1,simple_prefetch}), + make_checkout(C1, cancel), + make_enqueue(E,2,<<>>), + make_checkout(C1, {auto,1,simple_prefetch}), + make_settle(C1, [0]), + make_checkout(C1, cancel) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 2}, Commands), + ok. + +scenario12(_Config) -> + E = c:pid(0,217,0), + Commands = [make_enqueue(E,1,<<0>>), + make_enqueue(E,2,<<0>>), + make_enqueue(E,3,<<0>>)], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_bytes => 2}, Commands), + ok. + +scenario13(_Config) -> + E = c:pid(0,217,0), + Commands = [make_enqueue(E,1,<<0>>), + make_enqueue(E,2,<<>>), + make_enqueue(E,3,<<>>), + make_enqueue(E,4,<<>>) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 2}, Commands), + ok. + +scenario14(_Config) -> + E = c:pid(0,217,0), + Commands = [make_enqueue(E,1,<<0,0>>)], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_bytes => 1}, Commands), ok. snapshots(_Config) -> run_proper( fun () -> - ?FORALL(O, ?LET(Ops, log_gen(), expand(Ops)), - test1_prop(O)) - end, [], 1000). - -test1_prop(Commands) -> - ct:pal("Commands: ~p~n", [Commands]), - try run_snapshot_test(?FUNCTION_NAME, Commands) of + ?FORALL({Length, Bytes, SingleActiveConsumer}, + frequency([{10, {0, 0, false}}, + {5, {non_neg_integer(), non_neg_integer(), + boolean()}}]), + ?FORALL(O, ?LET(Ops, log_gen(200), expand(Ops)), + collect({Length, Bytes}, + snapshots_prop( + config(?FUNCTION_NAME, + Length, Bytes, + SingleActiveConsumer), O)))) + end, [], 2000). + +config(Name, Length, Bytes, SingleActive) -> + #{name => Name, + max_length => map_max(Length), + max_bytes => map_max(Bytes), + single_active_consumer_on => SingleActive}. + +map_max(0) -> undefined; +map_max(N) -> N. + +snapshots_prop(Conf, Commands) -> + ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]), + try run_snapshot_test(Conf, Commands) of _ -> true catch Err -> @@ -132,10 +270,10 @@ test1_prop(Commands) -> false end. -log_gen() -> +log_gen(Size) -> ?LET(EPids, vector(2, pid_gen()), ?LET(CPids, vector(2, pid_gen()), - resize(200, + resize(Size, list( frequency( [{20, enqueue_gen(oneof(EPids))}, @@ -157,15 +295,17 @@ down_gen(Pid) -> ?LET(E, {down, Pid, oneof([noconnection, noproc])}, E). enqueue_gen(Pid) -> - ?LET(E, {enqueue, Pid, frequency([{10, enqueue}, - {1, delay}])}, E). + ?LET(E, {enqueue, Pid, + frequency([{10, enqueue}, + {1, delay}]), + binary()}, E). checkout_cancel_gen(Pid) -> {checkout, Pid, cancel}. checkout_gen(Pid) -> %% pid, tag, prefetch - ?LET(C, {checkout, {binary(), Pid}, choose(1, 10)}, C). + ?LET(C, {checkout, {binary(), Pid}, choose(1, 100)}, C). -record(t, {state = rabbit_fifo:init(#{name => proper, @@ -193,9 +333,10 @@ expand(Ops) -> lists:reverse(Log). -handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0, - down = Down, - effects = Effs} = T) -> +handle_op({enqueue, Pid, When, Data}, + #t{enqueuers = Enqs0, + down = Down, + effects = Effs} = T) -> case Down of #{Pid := noproc} -> %% if it's a noproc then it cannot exist - can it? @@ -204,13 +345,12 @@ handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0, _ -> Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0), MsgSeq = maps:get(Pid, Enqs), - Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, msg), + Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, Data), case When of enqueue -> do_apply(Cmd, T#t{enqueuers = Enqs}); delay -> %% just put the command on the effects queue - ct:pal("delaying ~w", [Cmd]), T#t{effects = queue:in(Cmd, Effs)} end end; @@ -308,7 +448,6 @@ enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) -> Cmd = rabbit_fifo:make_settle({CTag, P}, MsgIds), enq_effs(Rem, queue:in(Cmd, Q)); enq_effs([_ | Rem], Q) -> - % ct:pal("enq_effs dropping ~w~n", [E]), enq_effs(Rem, Q). @@ -323,29 +462,40 @@ run_proper(Fun, Args, NumTests) -> (F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) end}])). -run_snapshot_test(Name, Commands) -> +run_snapshot_test(Conf, Commands) -> %% create every incremental permuation of the commands lists %% and run the snapshot tests against that [begin % ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]), - run_snapshot_test0(Name, C) + run_snapshot_test0(Conf, C) end || C <- prefixes(Commands, 1, [])]. -run_snapshot_test0(Name, Commands) -> +run_snapshot_test0(Conf, Commands) -> Indexes = lists:seq(1, length(Commands)), Entries = lists:zip(Indexes, Commands), - {State, Effects} = run_log(test_init(Name), Entries), + {State, Effects} = run_log(test_init(Conf), Entries), + % ct:pal("beginning snapshot test run for ~w numn commands ~b", + % [maps:get(name, Conf), length(Commands)]), [begin + %% drop all entries below and including the snapshot Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true; (_) -> false end, Entries), {S, _} = run_log(SnapState, Filtered), % assert log can be restored from any release cursor index - ?assertEqual(State, S) + case S of + State -> ok; + _ -> + ct:pal("Snapshot tests failed run log:~n" + "~p~n from ~n~p~n Entries~n~p~n", + [Filtered, SnapState, Entries]), + ?assertEqual(State, S) + end end || {release_cursor, SnapIdx, SnapState} <- Effects], ok. +%% transforms [1,2,3] into [[1,2,3], [1,2], [1]] prefixes(Source, N, Acc) when N > length(Source) -> lists:reverse(Acc); prefixes(Source, N, Acc) -> @@ -364,11 +514,12 @@ run_log(InitState, Entries) -> end end, {InitState, []}, Entries). -test_init(Name) -> - rabbit_fifo:init(#{name => Name, - queue_resource => blah, - shadow_copy_interval => 0, - metrics_handler => {?MODULE, metrics_handler, []}}). +test_init(Conf) -> + Default = #{queue_resource => blah, + shadow_copy_interval => 0, + metrics_handler => {?MODULE, metrics_handler, []}}, + rabbit_fifo:init(maps:merge(Default, Conf)). + meta(Idx) -> #{index => Idx, term => 1}. diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index 0343e7d136..581440d179 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -35,9 +35,15 @@ all() -> ]. groups() -> - MaxLengthTests = [max_length_drop_head, + MaxLengthTests = [max_length_default, + max_length_bytes_default, + max_length_drop_head, + max_length_bytes_drop_head, max_length_reject_confirm, - max_length_drop_publish], + max_length_bytes_reject_confirm, + max_length_drop_publish, + max_length_drop_publish_requeue, + max_length_bytes_drop_publish], [ {parallel_tests, [parallel], [ amqp_connection_refusal, @@ -59,11 +65,16 @@ groups() -> set_disk_free_limit_command, set_vm_memory_high_watermark_command, topic_matching, + max_message_size, + {queue_max_length, [], [ - {max_length_simple, [], MaxLengthTests}, - {max_length_mirrored, [], MaxLengthTests}]}, - max_message_size - ]} + {max_length_classic, [], MaxLengthTests}, + {max_length_quorum, [], [max_length_default, + max_length_bytes_default] + }, + {max_length_mirrored, [], MaxLengthTests} + ]} + ]} ]. suite() -> @@ -82,10 +93,23 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). +init_per_group(max_length_classic, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, false}]); +init_per_group(max_length_quorum, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); init_per_group(max_length_mirrored, Config) -> rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), - Config1 = rabbit_ct_helpers:set_config(Config, [{is_mirrored, true}]), + Config1 = rabbit_ct_helpers:set_config( + Config, [{is_mirrored, true}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, false}]), rabbit_ct_helpers:run_steps(Config1, []); init_per_group(Group, Config) -> case lists:member({group, Group}, all()) of @@ -132,29 +156,22 @@ end_per_group(Group, Config) -> end. init_per_testcase(Testcase, Config) -> - rabbit_ct_helpers:testcase_started(Config, Testcase). - -end_per_testcase(max_length_drop_head = Testcase, Config) -> + Group = proplists:get_value(name, ?config(tc_group_properties, Config)), + Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])), + Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q}]), + rabbit_ct_helpers:testcase_started(Config1, Testcase). + +end_per_testcase(Testcase, Config) + when Testcase == max_length_drop_publish; Testcase == max_length_bytes_drop_publish; + Testcase == max_length_drop_publish_requeue; + Testcase == max_length_reject_confirm; Testcase == max_length_bytes_reject_confirm; + Testcase == max_length_drop_head; Testcase == max_length_bytes_drop_head; + Testcase == max_length_default; Testcase == max_length_bytes_default -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_head_queue">>}), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_default_drop_head_queue">>}), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_drop_head_queue">>}), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_default_drop_head_queue">>}), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}), rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), rabbit_ct_helpers:testcase_finished(Config, Testcase); -end_per_testcase(max_length_reject_confirm = Testcase, Config) -> - {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_reject_queue">>}), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_reject_queue">>}), - rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), - rabbit_ct_helpers:testcase_finished(Config, Testcase); -end_per_testcase(max_length_drop_publish = Testcase, Config) -> - {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_publish_queue">>}), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_drop_publish_queue">>}), - rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), - rabbit_ct_helpers:testcase_finished(Config, Testcase); end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). @@ -1159,43 +1176,66 @@ set_vm_memory_high_watermark_command1(_Config) -> ) end. -max_length_drop_head(Config) -> +max_length_bytes_drop_head(Config) -> + max_length_bytes_drop_head(Config, [{<<"x-overflow">>, longstr, <<"drop-head">>}]). + +max_length_bytes_default(Config) -> + max_length_bytes_drop_head(Config, []). + +max_length_bytes_drop_head(Config, ExtraArgs) -> {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = <<"max_length_drop_head_queue">>, - QNameDefault = <<"max_length_default_drop_head_queue">>, - QNameBytes = <<"max_length_bytes_drop_head_queue">>, - QNameDefaultBytes = <<"max_length_bytes_default_drop_head_queue">>, + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), - MaxLengthArgs = [{<<"x-max-length">>, long, 1}], MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], - OverflowArgs = [{<<"x-overflow">>, longstr, <<"drop-head">>}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefault, arguments = MaxLengthArgs}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefaultBytes, arguments = MaxLengthBytesArgs}), - - check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), - check_max_length_drops_head(Config, QNameDefault, Ch, <<"1">>, <<"2">>, <<"3">>), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthBytesArgs ++ Args ++ ExtraArgs, durable = Durable}), %% 80 bytes payload Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>, Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>, Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>, - check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3), - check_max_length_drops_head(Config, QNameDefault, Ch, Payload1, Payload2, Payload3). + check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3). + +max_length_drop_head(Config) -> + max_length_drop_head(Config, [{<<"x-overflow">>, longstr, <<"drop-head">>}]). + +max_length_default(Config) -> + %% Defaults to drop_head + max_length_drop_head(Config, []). + +max_length_drop_head(Config, ExtraArgs) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + + MaxLengthArgs = [{<<"x-max-length">>, long, 1}], + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ Args ++ ExtraArgs, durable = Durable}), + + check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>). max_length_reject_confirm(Config) -> {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = <<"max_length_reject_queue">>, - QNameBytes = <<"max_length_bytes_reject_queue">>, + Args = ?config(queue_args, Config), + QName = ?config(queue_name, Config), + Durable = ?config(queue_durable, Config), MaxLengthArgs = [{<<"x-max-length">>, long, 1}], - MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), - check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), + check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>). + +max_length_bytes_reject_confirm(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + QNameBytes = ?config(queue_name, Config), + Durable = ?config(queue_durable, Config), + MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], + OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs ++ Args, durable = Durable}), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), %% 80 bytes payload Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>, @@ -1207,15 +1247,55 @@ max_length_reject_confirm(Config) -> max_length_drop_publish(Config) -> {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = <<"max_length_drop_publish_queue">>, - QNameBytes = <<"max_length_bytes_drop_publish_queue">>, + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), MaxLengthArgs = [{<<"x-max-length">>, long, 1}], - MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}), %% If confirms are not enable, publishes will still be dropped in reject-publish mode. - check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), + check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>). + +max_length_drop_publish_requeue(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + MaxLengthArgs = [{<<"x-max-length">>, long, 1}], + OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}), + %% If confirms are not enable, publishes will still be dropped in reject-publish mode. + check_max_length_requeue(Config, QName, Ch, <<"1">>, <<"2">>). + +check_max_length_requeue(Config, QName, Ch, Payload1, Payload2) -> + sync_mirrors(QName, Config), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + %% A single message is published and consumed + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + wait_for_consensus(QName, Config), + {#'basic.get_ok'{delivery_tag = DeliveryTag}, + #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + + %% Another message is published + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + wait_for_consensus(QName, Config), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}), + wait_for_consensus(QName, Config), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). + +max_length_bytes_drop_publish(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QNameBytes = ?config(queue_name, Config), + MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], + OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs ++ Args, durable = Durable}), %% 80 bytes payload Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>, @@ -1229,22 +1309,38 @@ check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3) #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% A single message is published and consumed amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% Message 2 is dropped, message 1 stays amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + wait_for_consensus(QName, Config), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% Messages 2 and 3 are dropped, message 1 stays amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + wait_for_consensus(QName, Config), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + wait_for_consensus(QName, Config), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}), + wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). +wait_for_consensus(QName, Config) -> + case lists:keyfind(<<"x-queue-type">>, 1, ?config(queue_args, Config)) of + {_, _, <<"quorum">>} -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + RaName = binary_to_atom(<<"%2F_", QName/binary>>, utf8), + {ok, _, _} = ra:members({RaName, Server}); + _ -> + ok + end. + check_max_length_rejects(Config, QName, Ch, Payload1, Payload2, Payload3) -> sync_mirrors(QName, Config), amqp_channel:register_confirm_handler(Ch, self()), @@ -1283,12 +1379,14 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) -> #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% A single message is published and consumed amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% Message 1 is replaced by message 2 amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), @@ -1296,6 +1394,7 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) -> amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}), + wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload3}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). |
