summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_fifo.erl548
-rw-r--r--src/rabbit_fifo_index.erl26
-rw-r--r--src/rabbit_quorum_queue.erl16
-rw-r--r--test/dynamic_qq_SUITE.erl5
-rw-r--r--test/quorum_queue_SUITE.erl69
-rw-r--r--test/quorum_queue_utils.erl9
-rw-r--r--test/rabbit_fifo_SUITE.erl11
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl217
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl207
9 files changed, 764 insertions, 344 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index b62ff62a51..9752e06e34 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -38,6 +38,7 @@
% queries
query_messages_ready/1,
query_messages_checked_out/1,
+ query_messages_total/1,
query_processes/1,
query_ra_indexes/1,
query_consumer_count/1,
@@ -87,8 +88,13 @@
-type msg() :: {msg_header(), raw_msg()}.
%% message with a header map.
+-type msg_size() :: non_neg_integer().
+%% the size in bytes of the msg payload
+
-type indexed_msg() :: {ra_index(), msg()}.
+-type prefix_msg() :: {'$prefix_msg', msg_size()}.
+
-type delivery_msg() :: {msg_id(), msg()}.
%% A tuple consisting of the message id and the headered message.
@@ -157,7 +163,7 @@
-type applied_mfa() :: {module(), atom(), list()}.
% represents a partially applied module call
--define(SHADOW_COPY_INTERVAL, 4096 * 4).
+-define(SHADOW_COPY_INTERVAL, 4096 * 8).
-define(USE_AVG_HALF_LIFE, 10000.0).
-record(consumer,
@@ -189,6 +195,7 @@
suspected_down = false :: boolean()
}).
+
-record(state,
{name :: atom(),
queue_resource :: rabbit_types:r('queue'),
@@ -202,7 +209,8 @@
next_msg_num = 1 :: msg_in_id(),
% list of returned msg_in_ids - when checking out it picks from
% this list first before taking low_msg_num
- returns = lqueue:new() :: lqueue:lqueue('$prefix_msg' | msg_in_id()),
+ returns = lqueue:new() :: lqueue:lqueue(prefix_msg() |
+ {msg_in_id(), indexed_msg()}),
% a counter of enqueues - used to trigger shadow copy points
enqueue_count = 0 :: non_neg_integer(),
% a map containing all the live processes that have ever enqueued
@@ -224,19 +232,20 @@
dead_letter_handler :: maybe(applied_mfa()),
become_leader_handler :: maybe(applied_mfa()),
%% This is a special field that is only used for snapshots
- %% It represents the number of queued messages at the time the
+ %% It represents the queued messages at the time the
%% dehydrated snapshot state was cached.
%% As release_cursors are only emitted for raft indexes where all
%% prior messages no longer contribute to the current state we can
- %% replace all message payloads at some index with a single integer
- %% to be decremented during `checkout_one' until it's 0 after which
- %% it instead takes messages from the `messages' map.
+ %% replace all message payloads with their sizes (to be used for
+ %% overflow calculations).
%% This is done so that consumers are still served in a deterministic
%% order on recovery.
- prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(),
- PrefixMsgs :: non_neg_integer()},
+ prefix_msgs = {[], []} :: {Return :: [msg_size()],
+ PrefixMsgs :: [msg_size()]},
msg_bytes_enqueue = 0 :: non_neg_integer(),
msg_bytes_checkout = 0 :: non_neg_integer(),
+ max_length :: maybe(non_neg_integer()),
+ max_bytes :: maybe(non_neg_integer()),
%% whether single active consumer is on or not for this queue
consumer_strategy = default :: default | single_active,
%% waiting consumers, one is picked active consumer is cancelled or dies
@@ -251,6 +260,8 @@
dead_letter_handler => applied_mfa(),
become_leader_handler => applied_mfa(),
shadow_copy_interval => non_neg_integer(),
+ max_length => non_neg_integer(),
+ max_bytes => non_neg_integer(),
single_active_consumer_on => boolean()}.
-export_type([protocol/0,
@@ -272,12 +283,14 @@
init(#{name := Name,
queue_resource := Resource} = Conf) ->
update_config(Conf, #state{name = Name,
- queue_resource = Resource}).
+ queue_resource = Resource}).
update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
+ MaxLength = maps:get(max_length, Conf, undefined),
+ MaxBytes = maps:get(max_bytes, Conf, undefined),
ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
true ->
single_active;
@@ -287,6 +300,8 @@ update_config(Conf, State) ->
State#state{dead_letter_handler = DLH,
become_leader_handler = BLH,
shadow_copy_interval = SHI,
+ max_length = MaxLength,
+ max_bytes = MaxBytes,
consumer_strategy = ConsumerStrategy}.
zero(_) ->
@@ -294,59 +309,49 @@ zero(_) ->
% msg_ids are scoped per consumer
% ra_indexes holds all raft indexes for enqueues currently on queue
--spec apply(ra_machine:command_meta_data(), command(),
- state()) ->
- {state(), Reply :: term(), ra_machine:effects()}.
-apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq,
- msg = RawMsg}, State00) ->
- case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State00) of
- {ok, State0, Effects1} ->
- %% need to checkout before capturing the shadow copy else
- %% snapshots may not be complete
- {State, ok, Effects} = checkout(
- add_bytes_enqueue(RawMsg, State0),
- Effects1),
- append_to_master_index(RaftIdx, Effects, State);
- {duplicate, State, Effects} ->
- {State, ok, lists:reverse(Effects)}
- end;
-apply(#{index := RaftIdx},
+-spec apply(ra_machine:command_meta_data(), command(), state()) ->
+ {state(), Reply :: term(), ra_machine:effects()} |
+ {state(), Reply :: term()}.
+apply(Metadata, #enqueue{pid = From, seq = Seq,
+ msg = RawMsg}, State00) ->
+ apply_enqueue(Metadata, From, Seq, RawMsg, State00);
+apply(Meta,
#settle{msg_ids = MsgIds, consumer_id = ConsumerId},
#state{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0} ->
% need to increment metrics before completing as any snapshot
% states taken need to includ them
- complete_and_checkout(RaftIdx, MsgIds, ConsumerId,
+ complete_and_checkout(Meta, MsgIds, ConsumerId,
Con0, [], State);
_ ->
{State, ok}
end;
-apply(#{index := RaftIdx}, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
+apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
#state{consumers = Cons0} = State0) ->
case Cons0 of
#{ConsumerId := Con0} ->
Discarded = maps:with(MsgIds, Con0#consumer.checked_out),
Effects = dead_letter_effects(Discarded, State0, []),
- complete_and_checkout(RaftIdx, MsgIds, ConsumerId, Con0,
+ complete_and_checkout(Meta, MsgIds, ConsumerId, Con0,
Effects, State0);
_ ->
{State0, ok}
end;
-apply(_, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
+apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
#state{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0 = #consumer{checked_out = Checked0}} ->
Checked = maps:without(MsgIds, Checked0),
Returned = maps:with(MsgIds, Checked0),
MsgNumMsgs = maps:values(Returned),
- return(ConsumerId, MsgNumMsgs, Con0, Checked, [], State);
+ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, [], State);
_ ->
{State, ok}
end;
-apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
- drain = Drain, consumer_id = ConsumerId},
+apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
+ drain = Drain, consumer_id = ConsumerId},
#state{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
case Cons0 of
@@ -359,7 +364,7 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
ServiceQueue0),
Cons = maps:put(ConsumerId, Con1, Cons0),
{State1, ok, Effects} =
- checkout(State0#state{service_queue = ServiceQueue,
+ checkout(Meta, State0#state{service_queue = ServiceQueue,
consumers = Cons}, []),
Response = {send_credit_reply, maps:size(State1#state.messages)},
%% by this point all checkouts for the updated credit value
@@ -389,46 +394,45 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
%% credit for unknown consumer - just ignore
{State0, ok}
end;
-apply(_, #checkout{spec = {dequeue, _}},
- #state{messages = M,
- prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 ->
- %% FIXME: also check if there are returned messages
- %% TODO do we need metric visibility of empty get requests?
- {State0, {dequeue, empty}};
-apply(Meta, #checkout{spec = {dequeue, settled}, meta = ConsumerMeta,
+apply(Meta, #checkout{spec = {dequeue, Settlement},
+ meta = ConsumerMeta,
consumer_id = ConsumerId},
- State0) ->
- % TODO: this clause could probably be optimised
- State1 = update_consumer(ConsumerId, ConsumerMeta,
- {once, 1, simple_prefetch}, State0),
- % turn send msg effect into reply
- {success, _, MsgId, Msg, State2} = checkout_one(State1),
- % immediately settle
- {State, _, Effects} = apply(Meta, make_settle(ConsumerId, [MsgId]), State2),
- {State, {dequeue, {MsgId, Msg}}, Effects};
-apply(_, #checkout{spec = {dequeue, unsettled},
- meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId},
- State0) ->
- State1 = update_consumer(ConsumerId, ConsumerMeta,
- {once, 1, simple_prefetch}, State0),
- case checkout_one(State1) of
- {success, _, MsgId, Msg, S} ->
- {S, {dequeue, {MsgId, Msg}}, [{monitor, process, Pid}]};
- {inactive, S} ->
- {S, {dequeue, empty}, [{aux, inactive}]};
- S ->
- {S, {dequeue, empty}}
+ #state{consumers = Consumers} = State0) ->
+ Exists = maps:is_key(ConsumerId, Consumers),
+ case messages_ready(State0) of
+ 0 ->
+ {State0, {dequeue, empty}};
+ _ when Exists ->
+ %% a dequeue using the same consumer_id isn't possible at this point
+ {State0, {dequeue, empty}};
+ _ ->
+ State1 = update_consumer(ConsumerId, ConsumerMeta,
+ {once, 1, simple_prefetch}, State0),
+ {success, _, MsgId, Msg, State2} = checkout_one(State1),
+ case Settlement of
+ unsettled ->
+ {_, Pid} = ConsumerId,
+ {State2, {dequeue, {MsgId, Msg}},
+ [{monitor, process, Pid}]};
+ settled ->
+ %% immediately settle the checkout
+ {State, _, Effects} = apply(Meta,
+ make_settle(ConsumerId, [MsgId]),
+ State2),
+ {State, {dequeue, {MsgId, Msg}}, Effects}
+ end
end;
-apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
+apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
{State, Effects} = cancel_consumer(ConsumerId, State0, []),
% TODO: here we should really demonitor the pid but _only_ if it has no
- % other consumers or enqueuers.
- checkout(State, Effects);
-apply(_, #checkout{spec = Spec, meta = Meta,
- consumer_id = {_, Pid} = ConsumerId},
+ % other consumers or enqueuers. leaving a monitor in place isn't harmful
+ % however
+ checkout(Meta, State, Effects);
+apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
+ consumer_id = {_, Pid} = ConsumerId},
State0) ->
- State1 = update_consumer(ConsumerId, Meta, Spec, State0),
- checkout(State1, [{monitor, process, Pid}]);
+ State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0),
+ checkout(Meta, State1, [{monitor, process, Pid}]);
apply(#{index := RaftIdx}, #purge{},
#state{ra_indexes = Indexes0,
messages = Messages} = State0) ->
@@ -486,7 +490,7 @@ apply(_, {down, ConsumerPid, noconnection},
%% TODO: should we run a checkout here?
{State#state{consumers = Cons, enqueuers = Enqs,
waiting_consumers = WaitingConsumers}, ok, Effects};
-apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
+apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
@@ -506,8 +510,8 @@ apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
{State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) ->
cancel_consumer(ConsumerId, S, E)
end, {State2, Effects1}, DownConsumers),
- checkout(State, Effects);
-apply(_, {nodeup, Node}, #state{consumers = Cons0,
+ checkout(Meta, State, Effects);
+apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
enqueuers = Enqs0,
service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
@@ -535,13 +539,13 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0,
Acc
end, {Cons0, SQ0, Monitors}, Cons0),
% TODO: avoid list concat
- checkout(State0#state{consumers = Cons1, enqueuers = Enqs1,
- service_queue = SQ,
- waiting_consumers = WaitingConsumers}, Effects);
+ checkout(Meta, State0#state{consumers = Cons1, enqueuers = Enqs1,
+ service_queue = SQ,
+ waiting_consumers = WaitingConsumers}, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
-apply(_, #update_config{config = Conf}, State) ->
- {update_config(Conf, State), ok}.
+apply(Meta, #update_config{config = Conf}, State) ->
+ checkout(Meta, update_config(Conf, State), []).
handle_waiting_consumer_down(_Pid,
#state{consumer_strategy = default} = State) ->
@@ -592,7 +596,7 @@ state_enter(leader, #state{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
name = Name,
- prefix_msg_counts = {0, 0},
+ prefix_msgs = {[], []},
become_leader_handler = BLH}) ->
% return effects to monitor all current consumers and enqueuers
Pids = lists:usort(maps:keys(Enqs)
@@ -608,10 +612,10 @@ state_enter(leader, #state{consumers = Cons,
{Mod, Fun, Args} ->
[{mod_call, Mod, Fun, Args ++ [Name]} | Effects]
end;
-state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts})
- when PrefixMsgCounts =/= {0, 0} ->
+state_enter(recovered, #state{prefix_msgs = PrefixMsgCounts})
+ when PrefixMsgCounts =/= {[], []} ->
%% TODO: remove assertion?
- exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts});
+ exit({rabbit_fifo, unexpected_prefix_msgs, PrefixMsgCounts});
state_enter(eol, #state{enqueuers = Enqs,
consumers = Custs0,
waiting_consumers = WaitingConsumers0}) ->
@@ -629,14 +633,12 @@ state_enter(_, _) ->
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
tick(_Ts, #state{name = Name,
queue_resource = QName,
- messages = Messages,
- ra_indexes = Indexes,
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes} = State) ->
Metrics = {Name,
- maps:size(Messages), % Ready
+ messages_ready(State),
num_checked_out(State), % checked out
- rabbit_fifo_index:size(Indexes), %% Total
+ messages_total(State),
query_consumer_count(State), % Consumers
EnqueueBytes,
CheckoutBytes},
@@ -649,8 +651,7 @@ overview(#state{consumers = Cons,
messages = Messages,
ra_indexes = Indexes,
msg_bytes_enqueue = EnqueueBytes,
- msg_bytes_checkout = CheckoutBytes
- } = State) ->
+ msg_bytes_checkout = CheckoutBytes} = State) ->
#{type => ?MODULE,
num_consumers => maps:size(Cons),
num_checked_out => num_checked_out(State),
@@ -693,13 +694,17 @@ handle_aux(_, cast, Cmd, {Name, Use0}, Log, _) ->
%%% Queries
-query_messages_ready(#state{messages = M}) ->
- M.
+%% TODO: this doesn't take returns into account
+query_messages_ready(State) ->
+ messages_ready(State).
query_messages_checked_out(#state{consumers = Consumers}) ->
maps:fold(fun (_, #consumer{checked_out = C}, S) ->
- maps:merge(S, maps:from_list(maps:values(C)))
- end, #{}, Consumers).
+ maps:size(C) + S
+ end, 0, Consumers).
+
+query_messages_total(State) ->
+ messages_total(State).
query_processes(#state{enqueuers = Enqs, consumers = Cons0}) ->
Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0),
@@ -770,6 +775,18 @@ usage(Name) when is_atom(Name) ->
%%% Internal
+messages_ready(#state{messages = M,
+ prefix_msgs = {PreR, PreM},
+ returns = R}) ->
+
+ %% TODO: optimise to avoid length/1 call
+ maps:size(M) + lqueue:len(R) + length(PreR) + length(PreM).
+
+messages_total(#state{ra_indexes = I,
+ prefix_msgs = {PreR, PreM}}) ->
+
+ rabbit_fifo_index:size(I) + length(PreR) + length(PreM).
+
update_use({inactive, _, _, _} = CUInfo, inactive) ->
CUInfo;
update_use({active, _, _} = CUInfo, active) ->
@@ -879,6 +896,35 @@ cancel_consumer0(ConsumerId,
{S0, Effects0}
end.
+apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
+ Bytes = message_size(RawMsg),
+ case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
+ {ok, State1, Effects1} ->
+ State2 = append_to_master_index(RaftIdx,
+ add_bytes_enqueue(Bytes, State1)),
+ {State, ok, Effects} = checkout(Meta, State2, Effects1),
+ {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
+ {duplicate, State, Effects} ->
+ {State, ok, Effects}
+ end.
+
+drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) ->
+ case take_next_msg(State0) of
+ {FullMsg = {_MsgId, {RaftIdxToDrop, {_Header, Msg}}},
+ State1} ->
+ Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0),
+ Bytes = message_size(Msg),
+ State = add_bytes_drop(Bytes, State1#state{ra_indexes = Indexes}),
+ Effects = dead_letter_effects(maps:put(none, FullMsg, #{}),
+ State, Effects0),
+ {State, Effects};
+ {{'$prefix_msg', Bytes}, State1} ->
+ State = add_bytes_drop(Bytes, State1),
+ {State, Effects0};
+ empty ->
+ {State0, Effects0}
+ end.
+
enqueue(RaftIdx, RawMsg, #state{messages = Messages,
low_msg_num = LowMsgNum,
next_msg_num = NextMsgNum} = State0) ->
@@ -889,19 +935,28 @@ enqueue(RaftIdx, RawMsg, #state{messages = Messages,
low_msg_num = min(LowMsgNum, NextMsgNum),
next_msg_num = NextMsgNum + 1}.
-append_to_master_index(RaftIdx, Effects,
+append_to_master_index(RaftIdx,
#state{ra_indexes = Indexes0} = State0) ->
- {State, Shadow} = incr_enqueue_count(State0),
- Indexes = rabbit_fifo_index:append(RaftIdx, Shadow, Indexes0),
- {State#state{ra_indexes = Indexes}, ok, Effects}.
+ State = incr_enqueue_count(State0),
+ Indexes = rabbit_fifo_index:append(RaftIdx, undefined, Indexes0),
+ State#state{ra_indexes = Indexes}.
incr_enqueue_count(#state{enqueue_count = C,
shadow_copy_interval = C} = State0) ->
- % time to stash a dehydrated state version
- State = State0#state{enqueue_count = 0},
- {State, dehydrate_state(State)};
+ % this will trigger a dehydrated version of the state to be stored
+ % at this raft index for potential future snapshot generation
+ State0#state{enqueue_count = 0};
incr_enqueue_count(#state{enqueue_count = C} = State) ->
- {State#state{enqueue_count = C + 1}, undefined}.
+ State#state{enqueue_count = C + 1}.
+
+maybe_store_dehydrated_state(RaftIdx, #state{enqueue_count = 0,
+ ra_indexes = Indexes} = State) ->
+ Dehydrated = dehydrate_state(State),
+ State#state{ra_indexes =
+ rabbit_fifo_index:update_if_present(RaftIdx, Dehydrated,
+ Indexes)};
+maybe_store_dehydrated_state(_RaftIdx, State) ->
+ State.
enqueue_pending(From,
@@ -916,7 +971,8 @@ enqueue_pending(From, Enq, #state{enqueuers = Enqueuers0} = State) ->
maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) ->
% direct enqueue without tracking
- {ok, enqueue(RaftIdx, RawMsg, State0), Effects};
+ State = enqueue(RaftIdx, RawMsg, State0),
+ {ok, State, Effects};
maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
#state{enqueuers = Enqueuers0} = State0) ->
case maps:get(From, Enqueuers0, undefined) of
@@ -947,19 +1003,19 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
snd(T) ->
element(2, T).
-return(ConsumerId, MsgNumMsgs, Con0, Checked,
+return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) ->
Con = Con0#consumer{checked_out = Checked,
credit = increase_credit(Con0, length(MsgNumMsgs))},
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
- State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) ->
+ State1 = lists:foldl(fun({'$prefix_msg', _} = Msg, S0) ->
return_one(0, Msg, S0);
({MsgNum, Msg}, S0) ->
return_one(MsgNum, Msg, S0)
end, State0, MsgNumMsgs),
- checkout(State1#state{consumers = Cons,
- service_queue = SQ},
+ checkout(Meta, State1#state{consumers = Cons,
+ service_queue = SQ},
Effects).
% used to processes messages that are finished
@@ -990,7 +1046,7 @@ increase_credit(#consumer{lifetime = auto,
increase_credit(#consumer{credit = Current}, Credit) ->
Current + Credit.
-complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
+complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
#consumer{checked_out = Checked0} = Con0,
Effects0, #state{ra_indexes = Indexes0} = State0) ->
Checked = maps:without(MsgIds, Checked0),
@@ -998,15 +1054,15 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) ->
add_bytes_settle(RawMsg, Acc);
- (_, Acc) ->
- Acc
+ ({'$prefix_msg', _} = M, Acc) ->
+ add_bytes_settle(M, Acc)
end, State0, maps:values(Discarded)),
%% need to pass the length of discarded as $prefix_msgs would be filtered
%% by the above list comprehension
{State2, Effects1} = complete(ConsumerId, MsgRaftIdxs,
maps:size(Discarded),
Con0, Checked, Effects0, State1),
- {State, ok, Effects} = checkout(State2, Effects1),
+ {State, ok, Effects} = checkout(Meta, State2, Effects1),
% settle metrics are incremented separately
update_smallest_raft_index(IncomingRaftIdx, Indexes0, State, Effects).
@@ -1016,7 +1072,7 @@ dead_letter_effects(_Discarded,
Effects;
dead_letter_effects(Discarded,
#state{dead_letter_handler = {Mod, Fun, Args}}, Effects) ->
- DeadLetters = maps:fold(fun(_, {_, {_, {_, Msg}}},
+ DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}},
% MsgId, MsgIdID, RaftId, Header
Acc) -> [{rejected, Msg} | Acc]
end, [], Discarded),
@@ -1028,7 +1084,6 @@ cancel_consumer_effects(ConsumerId, #state{queue_resource = QName}, Effects) ->
update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
#state{ra_indexes = Indexes,
- % prefix_msg_count = 0,
messages = Messages} = State, Effects) ->
case rabbit_fifo_index:size(Indexes) of
0 when map_size(Messages) =:= 0 ->
@@ -1057,35 +1112,45 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
end.
% TODO update message then update messages and returns in single operations
-return_one(0, '$prefix_msg',
+return_one(0, {'$prefix_msg', _} = Msg,
#state{returns = Returns} = State0) ->
- State0#state{returns = lqueue:in('$prefix_msg', Returns)};
+ add_bytes_return(Msg,
+ State0#state{returns = lqueue:in(Msg, Returns)});
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
- #state{messages = Messages,
- returns = Returns} = State0) ->
+ #state{returns = Returns} = State0) ->
Header = maps:update_with(delivery_count,
fun (C) -> C+1 end,
1, Header0),
Msg = {RaftId, {Header, RawMsg}},
% this should not affect the release cursor in any way
add_bytes_return(RawMsg,
- State0#state{messages = maps:put(MsgNum, Msg, Messages),
- returns = lqueue:in(MsgNum, Returns)}).
+ State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}).
-return_all(State, Checked0) ->
+return_all(State0, Checked0) ->
%% need to sort the list so that we return messages in the order
%% they were checked out
Checked = lists:sort(maps:to_list(Checked0)),
- lists:foldl(fun ({_, '$prefix_msg'}, S) ->
- return_one(0, '$prefix_msg', S);
+ lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, S) ->
+ return_one(0, Msg, S);
({_, {MsgNum, Msg}}, S) ->
return_one(MsgNum, Msg, S)
- end, State, Checked).
+ end, State0, Checked).
+
+
%% checkout new messages to consumers
%% reverses the effects list
-checkout(State, Effects) ->
- checkout0(checkout_one(State), Effects, #{}).
+checkout(#{index := Index}, State0, Effects0) ->
+ {State1, _Result, Effects1} = checkout0(checkout_one(State0),
+ Effects0, #{}),
+ case evaluate_limit(State0#state.ra_indexes, false,
+ State1, Effects1) of
+ {State, true, Effects} ->
+ update_smallest_raft_index(Index, State0#state.ra_indexes,
+ State, Effects);
+ {State, false, Effects} ->
+ {State, ok, Effects}
+ end.
checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) ->
DelMsg = {MsgId, Msg},
@@ -1093,12 +1158,31 @@ checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) ->
fun (M) -> [DelMsg | M] end,
[DelMsg], Acc0),
checkout0(checkout_one(State), Effects, Acc);
-checkout0({inactive, State}, Effects0, Acc) ->
- Effects = append_send_msg_effects(Effects0, Acc),
- {State, ok, lists:reverse([{aux, inactive} | Effects])};
-checkout0(State, Effects0, Acc) ->
- Effects = append_send_msg_effects(Effects0, Acc),
- {State, ok, lists:reverse(Effects)}.
+checkout0({Activity, State0}, Effects0, Acc) ->
+ Effects1 = case Activity of
+ nochange ->
+ append_send_msg_effects(Effects0, Acc);
+ inactive ->
+ [{aux, inactive}
+ | append_send_msg_effects(Effects0, Acc)]
+ end,
+ {State0, ok, lists:reverse(Effects1)}.
+
+evaluate_limit(_OldIndexes, Result,
+ #state{max_length = undefined,
+ max_bytes = undefined} = State,
+ Effects) ->
+ {State, Result, Effects};
+evaluate_limit(OldIndexes, Result,
+ State0, Effects0) ->
+ case is_over_limit(State0) of
+ true ->
+ {State, Effects} = drop_head(State0, Effects0),
+ evaluate_limit(OldIndexes, true, State, Effects);
+ false ->
+ {State0, Result, Effects0}
+ %% TODO: optimisation: avoid calling if nothing has been dropped?
+ end.
append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 ->
Effects;
@@ -1108,57 +1192,51 @@ append_send_msg_effects(Effects0, AccMap) ->
end, Effects0, AccMap),
[{aux, active} | Effects].
-next_checkout_message(#state{prefix_msg_counts = {PReturns, P}} = State)
- when PReturns > 0 ->
+%% next message is determined as follows:
+%% First we check if there are are prefex returns
+%% Then we check if there are current returns
+%% then we check prefix msgs
+%% then we check current messages
+%%
+%% When we return it is always done to the current return queue
+%% for both prefix messages and current messages
+take_next_msg(#state{prefix_msgs = {[Bytes | Rem], P}} = State) ->
%% there are prefix returns, these should be served first
- {'$prefix_msg', State#state{prefix_msg_counts = {PReturns - 1, P}}};
-next_checkout_message(#state{returns = Returns,
- low_msg_num = Low0,
- prefix_msg_counts = {R, P},
- next_msg_num = NextMsgNum} = State) ->
+ {{'$prefix_msg', Bytes},
+ State#state{prefix_msgs = {Rem, P}}};
+take_next_msg(#state{returns = Returns,
+ low_msg_num = Low0,
+ messages = Messages0,
+ prefix_msgs = {R, P}} = State) ->
%% use peek rather than out there as the most likely case is an empty
%% queue
case lqueue:peek(Returns) of
- {value, Next} ->
- {Next, State#state{returns = lqueue:drop(Returns)}};
- empty when P == 0 ->
+ {value, NextMsg} ->
+ {NextMsg,
+ State#state{returns = lqueue:drop(Returns)}};
+ empty when P == [] ->
case Low0 of
undefined ->
- {undefined, State};
+ empty;
_ ->
- case Low0 + 1 of
- NextMsgNum ->
- %% the map will be empty after this item is removed
- {Low0, State#state{low_msg_num = undefined}};
- Low ->
- {Low0, State#state{low_msg_num = Low}}
+ %% TODO: defensive?
+ {Msg, Messages} = maps:take(Low0, Messages0),
+ case maps:size(Messages) of
+ 0 ->
+ {{Low0, Msg},
+ State#state{messages = Messages,
+ low_msg_num = undefined}};
+ _ ->
+ {{Low0, Msg},
+ State#state{messages = Messages,
+ low_msg_num = Low0 + 1}}
end
end;
empty ->
+ [Bytes | Rem] = P,
%% There are prefix msgs
- {'$prefix_msg', State#state{prefix_msg_counts = {R, P - 1}}}
- end.
-
-%% next message is determined as follows:
-%% First we check if there are are prefex returns
-%% Then we check if there are current returns
-%% then we check prefix msgs
-%% then we check current messages
-%%
-%% When we return it is always done to the current return queue
-%% for both prefix messages and current messages
-take_next_msg(#state{messages = Messages0} = State0) ->
- case next_checkout_message(State0) of
- {'$prefix_msg', State} ->
- {'$prefix_msg', State, Messages0};
- {NextMsgInId, State} ->
- %% messages are available
- case maps:take(NextMsgInId, Messages0) of
- {IdxMsg, Messages} ->
- {{NextMsgInId, IdxMsg}, State, Messages};
- error ->
- error
- end
+ {{'$prefix_msg', Bytes},
+ State#state{prefix_msgs = {R, Rem}}}
end.
send_msg_effect({CTag, CPid}, Msgs) ->
@@ -1170,7 +1248,7 @@ checkout_one(#state{service_queue = SQ0,
case queue:peek(SQ0) of
{value, ConsumerId} ->
case take_next_msg(InitState) of
- {ConsumerMsg, State0, Messages} ->
+ {ConsumerMsg, State0} ->
SQ1 = queue:drop(SQ0),
%% there are consumers waiting to be serviced
%% process consumer checkout
@@ -1195,31 +1273,31 @@ checkout_one(#state{service_queue = SQ0,
update_or_remove_sub(ConsumerId, Con,
Cons0, SQ1, []),
State1 = State0#state{service_queue = SQ,
- messages = Messages,
consumers = Cons},
{State, Msg} =
case ConsumerMsg of
- '$prefix_msg' ->
- {State1, '$prefix_msg'};
+ {'$prefix_msg', _} ->
+ {add_bytes_checkout(ConsumerMsg, State1),
+ ConsumerMsg};
{_, {_, {_, RawMsg} = M}} ->
- {add_bytes_checkout(RawMsg, State1), M}
+ {add_bytes_checkout(RawMsg, State1),
+ M}
end,
{success, ConsumerId, Next, Msg, State};
error ->
%% consumer did not exist but was queued, recurse
checkout_one(InitState#state{service_queue = SQ1})
end;
- error ->
- InitState
+ empty ->
+ {nochange, InitState}
end;
empty ->
case maps:size(Messages0) of
- 0 -> InitState;
+ 0 -> {nochange, InitState};
_ -> {inactive, InitState}
end
end.
-
update_or_remove_sub(ConsumerId, #consumer{lifetime = auto,
credit = 0} = Con,
Cons, ServiceQueue, Effects) ->
@@ -1262,14 +1340,14 @@ update_consumer(ConsumerId, Meta, Spec,
%% general case, single active consumer off
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, Spec,
- #state{consumers = Cons0,
+ #state{consumers = Cons0,
consumer_strategy = single_active} = State0)
when map_size(Cons0) == 0 ->
%% single active consumer on, no one is consuming yet
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
- #state{consumer_strategy = single_active,
- waiting_consumers = WaitingConsumers0} = State0) ->
+ #state{consumer_strategy = single_active,
+ waiting_consumers = WaitingConsumers0} = State0) ->
%% single active consumer on and one active consumer already
%% adding the new consumer to the waiting list
Consumer = #consumer{lifetime = Life, meta = Meta,
@@ -1289,11 +1367,10 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
%% the credit update
N = maps:size(S#consumer.checked_out),
C = max(0, Credit - N),
- S#consumer{lifetime = Life,
- credit = C}
+ S#consumer{lifetime = Life, credit = C}
end, Init, Cons0),
ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons),
- ServiceQueue0),
+ ServiceQueue0),
State0#state{consumers = Cons, service_queue = ServiceQueue}.
@@ -1313,13 +1390,20 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
dehydrate_state(#state{messages = Messages,
consumers = Consumers,
returns = Returns,
- prefix_msg_counts = {PrefRetCnt, MsgCount}} = State) ->
- %% TODO: optimise to avoid having to iterate the queue to get the number
- %% of current returned messages
- RetLen = lqueue:len(Returns), % O(1)
- CurReturns = length([R || R <- lqueue:to_list(Returns),
- R =/= '$prefix_msg']),
- PrefixMsgCnt = MsgCount + maps:size(Messages) - CurReturns,
+ prefix_msgs = {PrefRet0, PrefMsg0}} = State) ->
+ %% TODO: optimise this function as far as possible
+ PrefRet = lists:foldl(fun ({'$prefix_msg', Bytes}, Acc) ->
+ [Bytes | Acc];
+ ({_, {_, {_, Raw}}}, Acc) ->
+ [message_size(Raw) | Acc]
+ end,
+ lists:reverse(PrefRet0),
+ lqueue:to_list(Returns)),
+ PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_H, Raw}}}, Acc) ->
+ [message_size(Raw) | Acc]
+ end,
+ lists:reverse(PrefMsg0),
+ lists:sort(maps:to_list(Messages))),
State#state{messages = #{},
ra_indexes = rabbit_fifo_index:empty(),
low_msg_num = undefined,
@@ -1327,14 +1411,26 @@ dehydrate_state(#state{messages = Messages,
dehydrate_consumer(C)
end, Consumers),
returns = lqueue:new(),
- %% messages include returns
- prefix_msg_counts = {RetLen + PrefRetCnt,
- PrefixMsgCnt}}.
+ prefix_msgs = {lists:reverse(PrefRet),
+ lists:reverse(PrefMsgs)}}.
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
- Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0),
+ Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
+ M;
+ (_, {_, {_, {_, Raw}}}) ->
+ {'$prefix_msg', message_size(Raw)}
+ end, Checked0),
Con#consumer{checked_out = Checked}.
+is_over_limit(#state{max_length = undefined,
+ max_bytes = undefined}) ->
+ false;
+is_over_limit(#state{max_length = MaxLength,
+ max_bytes = MaxBytes,
+ msg_bytes_enqueue = BytesEnq} = State) ->
+
+ messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
+
-spec make_enqueue(maybe(pid()), maybe(msg_seqno()), raw_msg()) -> protocol().
make_enqueue(Pid, Seq, Msg) ->
#enqueue{pid = Pid, seq = Seq, msg = Msg}.
@@ -1371,10 +1467,12 @@ make_purge() -> #purge{}.
make_update_config(Config) ->
#update_config{config = Config}.
-add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) ->
- Bytes = message_size(Msg),
+add_bytes_enqueue(Bytes, #state{msg_bytes_enqueue = Enqueue} = State) ->
State#state{msg_bytes_enqueue = Enqueue + Bytes}.
+add_bytes_drop(Bytes, #state{msg_bytes_enqueue = Enqueue} = State) ->
+ State#state{msg_bytes_enqueue = Enqueue - Bytes}.
+
add_bytes_checkout(Msg, #state{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue } = State) ->
Bytes = message_size(Msg),
@@ -1394,6 +1492,8 @@ add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout,
message_size(#basic_message{content = Content}) ->
#content{payload_fragments_rev = PFR} = Content,
iolist_size(PFR);
+message_size({'$prefix_msg', B}) ->
+ B;
message_size(B) when is_binary(B) ->
byte_size(B);
message_size(Msg) ->
@@ -1554,7 +1654,7 @@ enq_enq_deq_deq_settle_test() ->
{State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} =
apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
State2),
- {_State4, {dequeue, empty}, _} =
+ {_State4, {dequeue, empty}} =
apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}),
State3),
ok.
@@ -1599,7 +1699,7 @@ release_cursor_test() ->
checkout_enq_settle_test() ->
Cid = {?FUNCTION_NAME, self()},
- {State1, [{monitor, _, _}]} = check(Cid, 1, test_init(test)),
+ {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)),
{State2, Effects0} = enq(2, 1, first, State1),
?ASSERT_EFF({send_msg, _,
{delivery, ?FUNCTION_NAME,
@@ -1614,7 +1714,7 @@ checkout_enq_settle_test() ->
out_of_order_enqueue_test() ->
Cid = {?FUNCTION_NAME, self()},
- {State1, [{monitor, _, _}]} = check_n(Cid, 5, 5, test_init(test)),
+ {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
{State2, Effects2} = enq(2, 1, first, State1),
?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
% assert monitor was set up
@@ -1644,7 +1744,7 @@ out_of_order_first_enqueue_test() ->
duplicate_enqueue_test() ->
Cid = {<<"duplicate_enqueue_test">>, self()},
- {State1, [{monitor, _, _}]} = check_n(Cid, 5, 5, test_init(test)),
+ {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
{State2, Effects2} = enq(2, 1, first, State1),
?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
{_State3, Effects3} = enq(3, 1, first, State2),
@@ -1663,10 +1763,10 @@ return_checked_out_test() ->
{State0, [_, _]} = enq(1, 1, first, test_init(test)),
{State1, [_Monitor,
{send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event},
- {aux, active}
+ {aux, active} | _
]} = check(Cid, 2, State0),
% return
- {_State2, _, [_, _]} = apply(meta(3), make_return(Cid, [MsgId]), State1),
+ {_State2, _, [_]} = apply(meta(3), make_return(Cid, [MsgId]), State1),
ok.
return_auto_checked_out_test() ->
@@ -1695,7 +1795,8 @@ cancelled_checkout_out_test() ->
{State1, _} = check_auto(Cid, 2, State0),
% cancelled checkout should return all pending messages to queue
{State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), State1),
- ?assertEqual(2, maps:size(State2#state.messages)),
+ ?assertEqual(1, maps:size(State2#state.messages)),
+ ?assertEqual(1, lqueue:len(State2#state.returns)),
{State3, {dequeue, {0, {_, first}}}, _} =
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2),
@@ -1748,14 +1849,14 @@ down_with_noconnection_returns_unack_test() ->
?assertEqual(0, maps:size(State1#state.messages)),
?assertEqual(0, lqueue:len(State1#state.returns)),
{State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
- ?assertEqual(1, maps:size(State2a#state.messages)),
+ ?assertEqual(0, maps:size(State2a#state.messages)),
?assertEqual(1, lqueue:len(State2a#state.returns)),
ok.
down_with_noproc_enqueuer_is_cleaned_up_test() ->
State00 = test_init(test),
Pid = spawn(fun() -> ok end),
- {State0, _, Effects0} = apply(meta(1), {enqueue, Pid, 1, first}, State00),
+ {State0, _, Effects0} = apply(meta(1), make_enqueue(Pid, 1, first), State00),
?ASSERT_EFF({monitor, process, _}, Effects0),
{State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0),
% ensure there are no enqueuers
@@ -2116,7 +2217,7 @@ single_active_consumer_test() ->
% adding some consumers
AddConsumer = fun(CTag, State) ->
{NewState, _, _} = apply(
- #{},
+ meta(1),
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, self()}},
@@ -2134,7 +2235,8 @@ single_active_consumer_test() ->
?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#state.waiting_consumers)),
% cancelling a waiting consumer
- {State2, _, Effects1} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1),
+ {State2, _, Effects1} = apply(meta(2),
+ #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1),
% the active consumer should still be in place
?assertEqual(1, map_size(State2#state.consumers)),
?assert(maps:is_key({<<"ctag1">>, self()}, State2#state.consumers)),
@@ -2146,7 +2248,7 @@ single_active_consumer_test() ->
?assertEqual(1, length(Effects1)),
% cancelling the active consumer
- {State3, _, Effects2} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2),
+ {State3, _, Effects2} = apply(meta(3), #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2),
% the second registered consumer is now the active one
?assertEqual(1, map_size(State3#state.consumers)),
?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)),
@@ -2157,7 +2259,7 @@ single_active_consumer_test() ->
?assertEqual(2, length(Effects2)),
% cancelling the active consumer
- {State4, _, Effects3} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3),
+ {State4, _, Effects3} = apply(meta(4), #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3),
% the last waiting consumer became the active one
?assertEqual(1, map_size(State4#state.consumers)),
?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)),
@@ -2167,7 +2269,7 @@ single_active_consumer_test() ->
?assertEqual(2, length(Effects3)),
% cancelling the last consumer
- {State5, _, Effects4} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4),
+ {State5, _, Effects4} = apply(meta(5), #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4),
% no active consumer anymore
?assertEqual(0, map_size(State5#state.consumers)),
% still nothing in the waiting list
@@ -2192,7 +2294,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
- #{},
+ #{index => 1},
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, ChannelId}},
@@ -2203,16 +2305,17 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
% the channel of the active consumer goes down
- {State2, _, Effects} = apply(#{}, {down, Pid1, doesnotmatter}, State1),
+ {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, doesnotmatter}, State1),
% fell back to another consumer
?assertEqual(1, map_size(State2#state.consumers)),
% there are still waiting consumers
?assertEqual(2, length(State2#state.waiting_consumers)),
- % effects to unregister the consumer and to update the new active one (metrics) are there
+ % effects to unregister the consumer and
+ % to update the new active one (metrics) are there
?assertEqual(2, length(Effects)),
% the channel of the active consumer and a waiting consumer goes down
- {State3, _, Effects2} = apply(#{}, {down, Pid2, doesnotmatter}, State2),
+ {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, doesnotmatter}, State2),
% fell back to another consumer
?assertEqual(1, map_size(State3#state.consumers)),
% no more waiting consumer
@@ -2221,7 +2324,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
?assertEqual(3, length(Effects2)),
% the last channel goes down
- {State4, _, Effects3} = apply(#{}, {down, Pid3, doesnotmatter}, State3),
+ {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3),
% no more consumers
?assertEqual(0, map_size(State4#state.consumers)),
?assertEqual(0, length(State4#state.waiting_consumers)),
@@ -2237,10 +2340,11 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti
shadow_copy_interval => 0,
single_active_consumer_on => true}),
+ Meta = #{index => 1},
% adding some consumers
AddConsumer = fun(CTag, State) ->
{NewState, _, _} = apply(
- #{},
+ Meta,
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, self()}},
@@ -2259,7 +2363,7 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti
end, State2#state.waiting_consumers),
% simulate node goes back up
- {State3, _, _} = apply(#{}, {nodeup, node(self())}, State2),
+ {State3, _, _} = apply(#{index => 2}, {nodeup, node(self())}, State2),
% all the waiting consumers should be un-suspected
?assertEqual(3, length(State3#state.waiting_consumers)),
@@ -2281,10 +2385,11 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test() ->
Pid2 = spawn(DummyFunction),
Pid3 = spawn(DummyFunction),
+ Meta = #{index => 1},
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
- #{},
+ Meta,
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, ChannelId}},
@@ -2310,10 +2415,11 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test() ->
Pid2 = spawn(DummyFunction),
Pid3 = spawn(DummyFunction),
+ Meta = #{index => 1},
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
- #{},
+ Meta,
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, ChannelId}},
@@ -2333,11 +2439,11 @@ query_consumers_test() ->
atom_to_binary(?FUNCTION_NAME, utf8)),
shadow_copy_interval => 0,
single_active_consumer_on => true}),
-
+ Meta = #{index => 1},
% adding some consumers
AddConsumer = fun(CTag, State) ->
{NewState, _, _} = apply(
- #{},
+ Meta,
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, self()}},
diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl
index f8f414f453..184002611e 100644
--- a/src/rabbit_fifo_index.erl
+++ b/src/rabbit_fifo_index.erl
@@ -4,12 +4,14 @@
empty/0,
fetch/2,
append/3,
+ update_if_present/3,
return/3,
delete/2,
size/1,
smallest/1,
next_key_after/2,
- map/2
+ map/2,
+ to_map/1
]).
-include_lib("ra/include/ra.hrl").
@@ -36,12 +38,22 @@ fetch(Key, #?MODULE{data = Data}) ->
-spec append(integer(), term(), state()) -> state().
append(Key, Value,
#?MODULE{data = Data,
- smallest = Smallest,
- largest = Largest} = State)
+ smallest = Smallest,
+ largest = Largest} = State)
when Key > Largest orelse Largest =:= undefined ->
State#?MODULE{data = maps:put(Key, Value, Data),
- smallest = ra_lib:default(Smallest, Key),
- largest = Key}.
+ smallest = ra_lib:default(Smallest, Key),
+ largest = Key}.
+
+-spec update_if_present(integer(), term(), state()) -> state().
+update_if_present(Key, Value, #?MODULE{data = Data} = State) ->
+ case Data of
+ #{Key := _} ->
+ State#?MODULE{data = maps:put(Key, Value, Data)};
+ _ ->
+ State
+ end.
+
-spec return(integer(), term(), state()) -> state().
return(Key, Value, #?MODULE{data = Data, smallest = Smallest} = State)
@@ -76,6 +88,10 @@ delete(Key, #?MODULE{data = Data} = State) ->
size(#?MODULE{data = Data}) ->
maps:size(Data).
+-spec to_map(state()) -> #{integer() => term()}.
+to_map(#?MODULE{data = Data}) ->
+ Data.
+
-spec smallest(state()) -> undefined | {integer(), term()}.
smallest(#?MODULE{smallest = undefined}) ->
undefined;
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 5c0d1c0070..1f3aa1f38b 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -152,10 +152,15 @@ ra_machine(Q) ->
ra_machine_config(Q = #amqqueue{name = QName,
pid = {Name, _}}) ->
+ %% take the minimum value of the policy and the queue arg if present
+ MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
+ MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
#{name => Name,
queue_resource => QName,
dead_letter_handler => dlx_mfa(Q),
become_leader_handler => {?MODULE, become_leader, [QName]},
+ max_length => MaxLength,
+ max_bytes => MaxBytes,
single_active_consumer_on => single_active_consumer_on(Q)}.
single_active_consumer_on(#amqqueue{arguments = QArguments}) ->
@@ -636,8 +641,10 @@ delete_member(#amqqueue{pid = {RaName, _}, name = QName}, Node) ->
%%----------------------------------------------------------------------------
dlx_mfa(Q) ->
- DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, fun res_arg/2, Q), Q),
- DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>, fun res_arg/2, Q),
+ DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>,
+ fun res_arg/2, Q), Q),
+ DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>,
+ fun res_arg/2, Q),
{?MODULE, dead_letter_publish, [DLX, DLXRKey, Q#amqqueue.name]}.
init_dlx(undefined, _Q) ->
@@ -829,9 +836,8 @@ qnode({_, Node}) ->
Node.
check_invalid_arguments(QueueName, Args) ->
- Keys = [<<"x-expires">>, <<"x-message-ttl">>, <<"x-max-length">>,
- <<"x-max-length-bytes">>, <<"x-max-priority">>, <<"x-overflow">>,
- <<"x-queue-mode">>],
+ Keys = [<<"x-expires">>, <<"x-message-ttl">>,
+ <<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>],
[case rabbit_misc:table_lookup(Args, Key) of
undefined -> ok;
_TypeVal -> rabbit_misc:protocol_error(
diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl
index d1158ef07a..3357e6f74b 100644
--- a/test/dynamic_qq_SUITE.erl
+++ b/test/dynamic_qq_SUITE.erl
@@ -135,9 +135,8 @@ force_delete_if_no_consensus(Config) ->
passive = true})),
%% TODO implement a force delete
BCh2 = rabbit_ct_client_helpers:open_channel(Config, B),
- ?assertExit({{shutdown,
- {connection_closing, {server_initiated_close, 541, _}}}, _},
- amqp_channel:call(BCh2, #'queue.delete'{queue = QName})),
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(BCh2, #'queue.delete'{queue = QName})),
ok.
takeover_on_failure(Config) ->
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index dcba910a6a..48dac3ca57 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -22,6 +22,7 @@
-import(quorum_queue_utils, [wait_for_messages_ready/3,
wait_for_messages_pending_ack/3,
+ wait_for_messages_total/3,
dirty_query/3,
ra_name/1]).
@@ -37,6 +38,7 @@ all() ->
groups() ->
[
{single_node, [], all_tests()},
+ {single_node, [], memory_tests()},
{unclustered, [], [
{cluster_size_2, [], [add_member]}
]},
@@ -51,6 +53,7 @@ groups() ->
delete_member_not_found,
delete_member]
++ all_tests()},
+ {cluster_size_2, [], memory_tests()},
{cluster_size_3, [], [
declare_during_node_down,
simple_confirm_availability_on_leader_change,
@@ -61,7 +64,8 @@ groups() ->
delete_declare,
metrics_cleanup_on_leadership_takeover,
metrics_cleanup_on_leader_crash,
- consume_in_minority]},
+ consume_in_minority
+ ]},
{cluster_size_5, [], [start_queue,
start_queue_concurrent,
quorum_cluster_size_3,
@@ -126,6 +130,11 @@ all_tests() ->
consume_redelivery_count,
subscribe_redelivery_count,
message_bytes_metrics,
+ queue_length_limit_drop_head
+ ].
+
+memory_tests() ->
+ [
memory_alarm_rolls_wal
].
@@ -240,7 +249,9 @@ declare_args(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
LQ = ?config(queue_name, Config),
- declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
+ declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-length">>, long, 2000},
+ {<<"x-max-length-bytes">>, long, 2000}]),
assert_queue_type(Server, LQ, quorum),
DQ = <<"classic-declare-args-q">>,
@@ -293,16 +304,6 @@ declare_invalid_args(Config) ->
declare(rabbit_ct_client_helpers:open_channel(Config, Server),
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-message-ttl">>, long, 2000}])),
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-max-length">>, long, 2000}])),
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-max-length-bytes">>, long, 2000}])),
?assertExit(
{{shutdown, {server_initiated_close, 406, _}}, _},
@@ -314,7 +315,7 @@ declare_invalid_args(Config) ->
{{shutdown, {server_initiated_close, 406, _}}, _},
declare(rabbit_ct_client_helpers:open_channel(Config, Server),
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-overflow">>, longstr, <<"drop-head">>}])),
+ {<<"x-overflow">>, longstr, <<"reject-publish">>}])),
?assertExit(
{{shutdown, {server_initiated_close, 406, _}}, _},
@@ -1422,7 +1423,7 @@ metrics_cleanup_on_leadership_takeover(Config) ->
_ -> false
end
end),
- force_leader_change(Leader, Servers, QQ),
+ force_leader_change(Servers, QQ),
wait_until(fun () ->
[] =:= rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes]) andalso
[] =:= rpc:call(Leader, ets, lookup, [queue_metrics, QRes])
@@ -2151,6 +2152,32 @@ memory_alarm_rolls_wal(Config) ->
timer:sleep(1000),
ok.
+queue_length_limit_drop_head(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-length">>, long, 1}])),
+
+ RaName = ra_name(QQ),
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = QQ},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = <<"msg1">>}),
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = QQ},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = <<"msg2">>}),
+ wait_for_consensus(QQ, Config),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages_total(Servers, RaName, 1),
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})).
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->
@@ -2201,6 +2228,9 @@ filter_queues(Expected, Got) ->
lists:member(K, Keys)
end, Got).
+publish_many(Ch, Queue, Count) ->
+ [publish(Ch, Queue) || _ <- lists:seq(1, Count)].
+
publish(Ch, Queue) ->
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = Queue},
@@ -2268,14 +2298,16 @@ wait_until(Condition, N) ->
wait_until(Condition, N - 1)
end.
-force_leader_change(Leader, Servers, Q) ->
+
+force_leader_change([Server | _] = Servers, Q) ->
RaName = ra_name(Q),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
[F1, _] = Servers -- [Leader],
ok = rpc:call(F1, ra, trigger_election, [{RaName, F1}]),
case ra:members({RaName, Leader}) of
{ok, _, {_, Leader}} ->
%% Leader has been re-elected
- force_leader_change(Leader, Servers, Q);
+ force_leader_change(Servers, Q);
{ok, _, _} ->
%% Leader has changed
ok
@@ -2297,3 +2329,8 @@ get_message_bytes(Leader, QRes) ->
_ ->
[]
end.
+
+wait_for_consensus(Name, Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ RaName = ra_name(Name),
+ {ok, _, _} = ra:members({RaName, Server}).
diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl
index a216c220e6..6b820c7b5c 100644
--- a/test/quorum_queue_utils.erl
+++ b/test/quorum_queue_utils.erl
@@ -5,6 +5,7 @@
-export([
wait_for_messages_ready/3,
wait_for_messages_pending_ack/3,
+ wait_for_messages_total/3,
dirty_query/3,
ra_name/1
]).
@@ -17,6 +18,10 @@ wait_for_messages_pending_ack(Servers, QName, Ready) ->
wait_for_messages(Servers, QName, Ready,
fun rabbit_fifo:query_messages_checked_out/1, 60).
+wait_for_messages_total(Servers, QName, Total) ->
+ wait_for_messages(Servers, QName, Total,
+ fun rabbit_fifo:query_messages_total/1, 60).
+
wait_for_messages(Servers, QName, Number, Fun, 0) ->
Msgs = dirty_query(Servers, QName, Fun),
Totals = lists:map(fun(M) when is_map(M) ->
@@ -28,8 +33,8 @@ wait_for_messages(Servers, QName, Number, Fun, 0) ->
wait_for_messages(Servers, QName, Number, Fun, N) ->
Msgs = dirty_query(Servers, QName, Fun),
ct:pal("Got messages ~p", [Msgs]),
- case lists:all(fun(M) when is_map(M) ->
- maps:size(M) == Number;
+ case lists:all(fun(C) when is_integer(C) ->
+ C == Number;
(_) ->
false
end, Msgs) of
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 0512e8161a..9f1f3a4797 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -300,6 +300,7 @@ returns_after_down(Config) ->
Self ! checkout_done
end),
receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end,
+ timer:sleep(1000),
% message should be available for dequeue
{ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
ra:stop_server(ServerId),
@@ -481,15 +482,15 @@ test_queries(Config) ->
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
{ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0),
{ok, {_, Ready}, _} = ra:local_query(ServerId,
- fun rabbit_fifo:query_messages_ready/1),
- ?assertEqual(1, maps:size(Ready)),
+ fun rabbit_fifo:query_messages_ready/1),
+ ?assertEqual(1, Ready),
ct:pal("Ready ~w~n", [Ready]),
{ok, {_, Checked}, _} = ra:local_query(ServerId,
- fun rabbit_fifo:query_messages_checked_out/1),
- ?assertEqual(1, maps:size(Checked)),
+ fun rabbit_fifo:query_messages_checked_out/1),
+ ?assertEqual(1, Checked),
ct:pal("Checked ~w~n", [Checked]),
{ok, {_, Processes}, _} = ra:local_query(ServerId,
- fun rabbit_fifo:query_processes/1),
+ fun rabbit_fifo:query_processes/1),
ct:pal("Processes ~w~n", [Processes]),
?assertEqual(2, length(Processes)),
P ! stop,
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index a8604b46af..5643da1991 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -25,7 +25,17 @@ all_tests() ->
scenario1,
scenario2,
scenario3,
- scenario4
+ scenario4,
+ scenario5,
+ scenario6,
+ scenario7,
+ scenario8,
+ scenario9,
+ scenario10,
+ scenario11,
+ scenario12,
+ scenario13,
+ scenario14
].
groups() ->
@@ -73,7 +83,7 @@ scenario1(_Config) ->
make_return(C2, [1]), %% E2 in returns E1 with C2
make_settle(C2, [2]) %% E2 with C2
],
- run_snapshot_test(?FUNCTION_NAME, Commands),
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
ok.
scenario2(_Config) ->
@@ -88,7 +98,7 @@ scenario2(_Config) ->
make_settle(C1, [0]),
make_settle(C2, [0])
],
- run_snapshot_test(?FUNCTION_NAME, Commands),
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
ok.
scenario3(_Config) ->
@@ -102,7 +112,7 @@ scenario3(_Config) ->
make_settle(C1, [1]),
make_settle(C1, [2])
],
- run_snapshot_test(?FUNCTION_NAME, Commands),
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
ok.
scenario4(_Config) ->
@@ -112,19 +122,147 @@ scenario4(_Config) ->
make_enqueue(E,1,msg),
make_settle(C1, [0])
],
- run_snapshot_test(?FUNCTION_NAME, Commands),
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
+ ok.
+
+scenario5(_Config) ->
+ C1 = {<<>>, c:pid(0,505,0)},
+ E = c:pid(0,465,9),
+ Commands = [make_enqueue(E,1,<<0>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,2,<<>>),
+ make_settle(C1,[0])],
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
+ ok.
+
+scenario6(_Config) ->
+ E = c:pid(0,465,9),
+ Commands = [make_enqueue(E,1,<<>>), %% 1 msg on queue - snap: prefix 1
+ make_enqueue(E,2,<<>>) %% 1. msg on queue - snap: prefix 1
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario7(_Config) ->
+ C1 = {<<>>, c:pid(0,208,0)},
+ E = c:pid(0,188,0),
+ Commands = [
+ make_enqueue(E,1,<<>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,2,<<>>),
+ make_enqueue(E,3,<<>>),
+ make_settle(C1,[0])],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario8(_Config) ->
+ C1 = {<<>>, c:pid(0,208,0)},
+ E = c:pid(0,188,0),
+ Commands = [
+ make_enqueue(E,1,<<>>),
+ make_enqueue(E,2,<<>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ % make_checkout(C1, cancel),
+ {down, E, noconnection},
+ make_settle(C1, [0])],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario9(_Config) ->
+ E = c:pid(0,188,0),
+ Commands = [
+ make_enqueue(E,1,<<>>),
+ make_enqueue(E,2,<<>>),
+ make_enqueue(E,3,<<>>)],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario10(_Config) ->
+ C1 = {<<>>, c:pid(0,208,0)},
+ E = c:pid(0,188,0),
+ Commands = [
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,1,<<>>),
+ make_settle(C1, [0])
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario11(_Config) ->
+ C1 = {<<>>, c:pid(0,215,0)},
+ E = c:pid(0,217,0),
+ Commands = [
+ make_enqueue(E,1,<<>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_checkout(C1, cancel),
+ make_enqueue(E,2,<<>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_settle(C1, [0]),
+ make_checkout(C1, cancel)
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 2}, Commands),
+ ok.
+
+scenario12(_Config) ->
+ E = c:pid(0,217,0),
+ Commands = [make_enqueue(E,1,<<0>>),
+ make_enqueue(E,2,<<0>>),
+ make_enqueue(E,3,<<0>>)],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_bytes => 2}, Commands),
+ ok.
+
+scenario13(_Config) ->
+ E = c:pid(0,217,0),
+ Commands = [make_enqueue(E,1,<<0>>),
+ make_enqueue(E,2,<<>>),
+ make_enqueue(E,3,<<>>),
+ make_enqueue(E,4,<<>>)
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 2}, Commands),
+ ok.
+
+scenario14(_Config) ->
+ E = c:pid(0,217,0),
+ Commands = [make_enqueue(E,1,<<0,0>>)],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_bytes => 1}, Commands),
ok.
snapshots(_Config) ->
run_proper(
fun () ->
- ?FORALL(O, ?LET(Ops, log_gen(), expand(Ops)),
- test1_prop(O))
- end, [], 1000).
-
-test1_prop(Commands) ->
- ct:pal("Commands: ~p~n", [Commands]),
- try run_snapshot_test(?FUNCTION_NAME, Commands) of
+ ?FORALL({Length, Bytes, SingleActiveConsumer},
+ frequency([{10, {0, 0, false}},
+ {5, {non_neg_integer(), non_neg_integer(),
+ boolean()}}]),
+ ?FORALL(O, ?LET(Ops, log_gen(200), expand(Ops)),
+ collect({Length, Bytes},
+ snapshots_prop(
+ config(?FUNCTION_NAME,
+ Length, Bytes,
+ SingleActiveConsumer), O))))
+ end, [], 2000).
+
+config(Name, Length, Bytes, SingleActive) ->
+ #{name => Name,
+ max_length => map_max(Length),
+ max_bytes => map_max(Bytes),
+ single_active_consumer_on => SingleActive}.
+
+map_max(0) -> undefined;
+map_max(N) -> N.
+
+snapshots_prop(Conf, Commands) ->
+ ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]),
+ try run_snapshot_test(Conf, Commands) of
_ -> true
catch
Err ->
@@ -132,10 +270,10 @@ test1_prop(Commands) ->
false
end.
-log_gen() ->
+log_gen(Size) ->
?LET(EPids, vector(2, pid_gen()),
?LET(CPids, vector(2, pid_gen()),
- resize(200,
+ resize(Size,
list(
frequency(
[{20, enqueue_gen(oneof(EPids))},
@@ -157,15 +295,17 @@ down_gen(Pid) ->
?LET(E, {down, Pid, oneof([noconnection, noproc])}, E).
enqueue_gen(Pid) ->
- ?LET(E, {enqueue, Pid, frequency([{10, enqueue},
- {1, delay}])}, E).
+ ?LET(E, {enqueue, Pid,
+ frequency([{10, enqueue},
+ {1, delay}]),
+ binary()}, E).
checkout_cancel_gen(Pid) ->
{checkout, Pid, cancel}.
checkout_gen(Pid) ->
%% pid, tag, prefetch
- ?LET(C, {checkout, {binary(), Pid}, choose(1, 10)}, C).
+ ?LET(C, {checkout, {binary(), Pid}, choose(1, 100)}, C).
-record(t, {state = rabbit_fifo:init(#{name => proper,
@@ -193,9 +333,10 @@ expand(Ops) ->
lists:reverse(Log).
-handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0,
- down = Down,
- effects = Effs} = T) ->
+handle_op({enqueue, Pid, When, Data},
+ #t{enqueuers = Enqs0,
+ down = Down,
+ effects = Effs} = T) ->
case Down of
#{Pid := noproc} ->
%% if it's a noproc then it cannot exist - can it?
@@ -204,13 +345,12 @@ handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0,
_ ->
Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0),
MsgSeq = maps:get(Pid, Enqs),
- Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, msg),
+ Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, Data),
case When of
enqueue ->
do_apply(Cmd, T#t{enqueuers = Enqs});
delay ->
%% just put the command on the effects queue
- ct:pal("delaying ~w", [Cmd]),
T#t{effects = queue:in(Cmd, Effs)}
end
end;
@@ -308,7 +448,6 @@ enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) ->
Cmd = rabbit_fifo:make_settle({CTag, P}, MsgIds),
enq_effs(Rem, queue:in(Cmd, Q));
enq_effs([_ | Rem], Q) ->
- % ct:pal("enq_effs dropping ~w~n", [E]),
enq_effs(Rem, Q).
@@ -323,29 +462,40 @@ run_proper(Fun, Args, NumTests) ->
(F, A) -> ct:pal(?LOW_IMPORTANCE, F, A)
end}])).
-run_snapshot_test(Name, Commands) ->
+run_snapshot_test(Conf, Commands) ->
%% create every incremental permuation of the commands lists
%% and run the snapshot tests against that
[begin
% ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]),
- run_snapshot_test0(Name, C)
+ run_snapshot_test0(Conf, C)
end || C <- prefixes(Commands, 1, [])].
-run_snapshot_test0(Name, Commands) ->
+run_snapshot_test0(Conf, Commands) ->
Indexes = lists:seq(1, length(Commands)),
Entries = lists:zip(Indexes, Commands),
- {State, Effects} = run_log(test_init(Name), Entries),
+ {State, Effects} = run_log(test_init(Conf), Entries),
+ % ct:pal("beginning snapshot test run for ~w numn commands ~b",
+ % [maps:get(name, Conf), length(Commands)]),
[begin
+ %% drop all entries below and including the snapshot
Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
end, Entries),
{S, _} = run_log(SnapState, Filtered),
% assert log can be restored from any release cursor index
- ?assertEqual(State, S)
+ case S of
+ State -> ok;
+ _ ->
+ ct:pal("Snapshot tests failed run log:~n"
+ "~p~n from ~n~p~n Entries~n~p~n",
+ [Filtered, SnapState, Entries]),
+ ?assertEqual(State, S)
+ end
end || {release_cursor, SnapIdx, SnapState} <- Effects],
ok.
+%% transforms [1,2,3] into [[1,2,3], [1,2], [1]]
prefixes(Source, N, Acc) when N > length(Source) ->
lists:reverse(Acc);
prefixes(Source, N, Acc) ->
@@ -364,11 +514,12 @@ run_log(InitState, Entries) ->
end
end, {InitState, []}, Entries).
-test_init(Name) ->
- rabbit_fifo:init(#{name => Name,
- queue_resource => blah,
- shadow_copy_interval => 0,
- metrics_handler => {?MODULE, metrics_handler, []}}).
+test_init(Conf) ->
+ Default = #{queue_resource => blah,
+ shadow_copy_interval => 0,
+ metrics_handler => {?MODULE, metrics_handler, []}},
+ rabbit_fifo:init(maps:merge(Default, Conf)).
+
meta(Idx) ->
#{index => Idx, term => 1}.
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index 0343e7d136..581440d179 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -35,9 +35,15 @@ all() ->
].
groups() ->
- MaxLengthTests = [max_length_drop_head,
+ MaxLengthTests = [max_length_default,
+ max_length_bytes_default,
+ max_length_drop_head,
+ max_length_bytes_drop_head,
max_length_reject_confirm,
- max_length_drop_publish],
+ max_length_bytes_reject_confirm,
+ max_length_drop_publish,
+ max_length_drop_publish_requeue,
+ max_length_bytes_drop_publish],
[
{parallel_tests, [parallel], [
amqp_connection_refusal,
@@ -59,11 +65,16 @@ groups() ->
set_disk_free_limit_command,
set_vm_memory_high_watermark_command,
topic_matching,
+ max_message_size,
+
{queue_max_length, [], [
- {max_length_simple, [], MaxLengthTests},
- {max_length_mirrored, [], MaxLengthTests}]},
- max_message_size
- ]}
+ {max_length_classic, [], MaxLengthTests},
+ {max_length_quorum, [], [max_length_default,
+ max_length_bytes_default]
+ },
+ {max_length_mirrored, [], MaxLengthTests}
+ ]}
+ ]}
].
suite() ->
@@ -82,10 +93,23 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
+init_per_group(max_length_classic, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, false}]);
+init_per_group(max_length_quorum, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
init_per_group(max_length_mirrored, Config) ->
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
<<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
- Config1 = rabbit_ct_helpers:set_config(Config, [{is_mirrored, true}]),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [{is_mirrored, true},
+ {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, false}]),
rabbit_ct_helpers:run_steps(Config1, []);
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
@@ -132,29 +156,22 @@ end_per_group(Group, Config) ->
end.
init_per_testcase(Testcase, Config) ->
- rabbit_ct_helpers:testcase_started(Config, Testcase).
-
-end_per_testcase(max_length_drop_head = Testcase, Config) ->
+ Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
+ Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q}]),
+ rabbit_ct_helpers:testcase_started(Config1, Testcase).
+
+end_per_testcase(Testcase, Config)
+ when Testcase == max_length_drop_publish; Testcase == max_length_bytes_drop_publish;
+ Testcase == max_length_drop_publish_requeue;
+ Testcase == max_length_reject_confirm; Testcase == max_length_bytes_reject_confirm;
+ Testcase == max_length_drop_head; Testcase == max_length_bytes_drop_head;
+ Testcase == max_length_default; Testcase == max_length_bytes_default ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_head_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_default_drop_head_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_drop_head_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_default_drop_head_queue">>}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
rabbit_ct_helpers:testcase_finished(Config, Testcase);
-end_per_testcase(max_length_reject_confirm = Testcase, Config) ->
- {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_reject_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_reject_queue">>}),
- rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
- rabbit_ct_helpers:testcase_finished(Config, Testcase);
-end_per_testcase(max_length_drop_publish = Testcase, Config) ->
- {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_publish_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_drop_publish_queue">>}),
- rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
- rabbit_ct_helpers:testcase_finished(Config, Testcase);
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
@@ -1159,43 +1176,66 @@ set_vm_memory_high_watermark_command1(_Config) ->
)
end.
-max_length_drop_head(Config) ->
+max_length_bytes_drop_head(Config) ->
+ max_length_bytes_drop_head(Config, [{<<"x-overflow">>, longstr, <<"drop-head">>}]).
+
+max_length_bytes_default(Config) ->
+ max_length_bytes_drop_head(Config, []).
+
+max_length_bytes_drop_head(Config, ExtraArgs) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- QName = <<"max_length_drop_head_queue">>,
- QNameDefault = <<"max_length_default_drop_head_queue">>,
- QNameBytes = <<"max_length_bytes_drop_head_queue">>,
- QNameDefaultBytes = <<"max_length_bytes_default_drop_head_queue">>,
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
- MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
- OverflowArgs = [{<<"x-overflow">>, longstr, <<"drop-head">>}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefault, arguments = MaxLengthArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefaultBytes, arguments = MaxLengthBytesArgs}),
-
- check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
- check_max_length_drops_head(Config, QNameDefault, Ch, <<"1">>, <<"2">>, <<"3">>),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthBytesArgs ++ Args ++ ExtraArgs, durable = Durable}),
%% 80 bytes payload
Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>,
Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>,
- check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3),
- check_max_length_drops_head(Config, QNameDefault, Ch, Payload1, Payload2, Payload3).
+ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3).
+
+max_length_drop_head(Config) ->
+ max_length_drop_head(Config, [{<<"x-overflow">>, longstr, <<"drop-head">>}]).
+
+max_length_default(Config) ->
+ %% Defaults to drop_head
+ max_length_drop_head(Config, []).
+
+max_length_drop_head(Config, ExtraArgs) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+
+ MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ Args ++ ExtraArgs, durable = Durable}),
+
+ check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>).
max_length_reject_confirm(Config) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- QName = <<"max_length_reject_queue">>,
- QNameBytes = <<"max_length_bytes_reject_queue">>,
+ Args = ?config(queue_args, Config),
+ QName = ?config(queue_name, Config),
+ Durable = ?config(queue_durable, Config),
MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
- MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
- check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
+ check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>).
+
+max_length_bytes_reject_confirm(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ QNameBytes = ?config(queue_name, Config),
+ Durable = ?config(queue_durable, Config),
+ MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs ++ Args, durable = Durable}),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
%% 80 bytes payload
Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
@@ -1207,15 +1247,55 @@ max_length_reject_confirm(Config) ->
max_length_drop_publish(Config) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- QName = <<"max_length_drop_publish_queue">>,
- QNameBytes = <<"max_length_bytes_drop_publish_queue">>,
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
- MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}),
%% If confirms are not enable, publishes will still be dropped in reject-publish mode.
- check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
+ check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>).
+
+max_length_drop_publish_requeue(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+ MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}),
+ %% If confirms are not enable, publishes will still be dropped in reject-publish mode.
+ check_max_length_requeue(Config, QName, Ch, <<"1">>, <<"2">>).
+
+check_max_length_requeue(Config, QName, Ch, Payload1, Payload2) ->
+ sync_mirrors(QName, Config),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ %% A single message is published and consumed
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
+ {#'basic.get_ok'{delivery_tag = DeliveryTag},
+ #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Another message is published
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ wait_for_consensus(QName, Config),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+ wait_for_consensus(QName, Config),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+
+max_length_bytes_drop_publish(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QNameBytes = ?config(queue_name, Config),
+ MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs ++ Args, durable = Durable}),
%% 80 bytes payload
Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
@@ -1229,22 +1309,38 @@ check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3)
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% A single message is published and consumed
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% Message 2 is dropped, message 1 stays
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% Messages 2 and 3 are dropped, message 1 stays
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ wait_for_consensus(QName, Config),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+wait_for_consensus(QName, Config) ->
+ case lists:keyfind(<<"x-queue-type">>, 1, ?config(queue_args, Config)) of
+ {_, _, <<"quorum">>} ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ RaName = binary_to_atom(<<"%2F_", QName/binary>>, utf8),
+ {ok, _, _} = ra:members({RaName, Server});
+ _ ->
+ ok
+ end.
+
check_max_length_rejects(Config, QName, Ch, Payload1, Payload2, Payload3) ->
sync_mirrors(QName, Config),
amqp_channel:register_confirm_handler(Ch, self()),
@@ -1283,12 +1379,14 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) ->
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% A single message is published and consumed
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% Message 1 is replaced by message 2
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
@@ -1296,6 +1394,7 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) ->
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload3}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).