summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-01-15 16:38:36 +0000
committerkjnilsson <knilsson@pivotal.io>2019-01-24 14:44:27 +0000
commit08bded2fa243a15a9c7ced2b040066ad586104f1 (patch)
tree5ad118b2cd3238605038a52ffcbd6a6711e2f026
parent267755365ab4fa2fd2568d86399806c442c7fd61 (diff)
downloadrabbitmq-server-git-08bded2fa243a15a9c7ced2b040066ad586104f1.tar.gz
Quorum queue queue length limit by byte size and number of messages
Only drop-head strategy. This necessitated the change of rabbit_fifo prefix messages from a tuple of integers representing the number of returned vs enqueued messages that have already been processes and thus don't need to include message bodes in the snapshot to a tuple of lists of the sizes of each message. This change will have some performance impact as the snaphots will now be larger than before but as they still won't contain message bodies at least the sizing is fixed. Decreased the frequency as snapshots points are prepared so somewhat make up for this. [#161247380]
-rw-r--r--src/rabbit_fifo.erl548
-rw-r--r--src/rabbit_fifo_index.erl26
-rw-r--r--src/rabbit_quorum_queue.erl16
-rw-r--r--test/dynamic_qq_SUITE.erl5
-rw-r--r--test/quorum_queue_SUITE.erl69
-rw-r--r--test/quorum_queue_utils.erl9
-rw-r--r--test/rabbit_fifo_SUITE.erl11
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl217
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl207
9 files changed, 764 insertions, 344 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index b62ff62a51..9752e06e34 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -38,6 +38,7 @@
% queries
query_messages_ready/1,
query_messages_checked_out/1,
+ query_messages_total/1,
query_processes/1,
query_ra_indexes/1,
query_consumer_count/1,
@@ -87,8 +88,13 @@
-type msg() :: {msg_header(), raw_msg()}.
%% message with a header map.
+-type msg_size() :: non_neg_integer().
+%% the size in bytes of the msg payload
+
-type indexed_msg() :: {ra_index(), msg()}.
+-type prefix_msg() :: {'$prefix_msg', msg_size()}.
+
-type delivery_msg() :: {msg_id(), msg()}.
%% A tuple consisting of the message id and the headered message.
@@ -157,7 +163,7 @@
-type applied_mfa() :: {module(), atom(), list()}.
% represents a partially applied module call
--define(SHADOW_COPY_INTERVAL, 4096 * 4).
+-define(SHADOW_COPY_INTERVAL, 4096 * 8).
-define(USE_AVG_HALF_LIFE, 10000.0).
-record(consumer,
@@ -189,6 +195,7 @@
suspected_down = false :: boolean()
}).
+
-record(state,
{name :: atom(),
queue_resource :: rabbit_types:r('queue'),
@@ -202,7 +209,8 @@
next_msg_num = 1 :: msg_in_id(),
% list of returned msg_in_ids - when checking out it picks from
% this list first before taking low_msg_num
- returns = lqueue:new() :: lqueue:lqueue('$prefix_msg' | msg_in_id()),
+ returns = lqueue:new() :: lqueue:lqueue(prefix_msg() |
+ {msg_in_id(), indexed_msg()}),
% a counter of enqueues - used to trigger shadow copy points
enqueue_count = 0 :: non_neg_integer(),
% a map containing all the live processes that have ever enqueued
@@ -224,19 +232,20 @@
dead_letter_handler :: maybe(applied_mfa()),
become_leader_handler :: maybe(applied_mfa()),
%% This is a special field that is only used for snapshots
- %% It represents the number of queued messages at the time the
+ %% It represents the queued messages at the time the
%% dehydrated snapshot state was cached.
%% As release_cursors are only emitted for raft indexes where all
%% prior messages no longer contribute to the current state we can
- %% replace all message payloads at some index with a single integer
- %% to be decremented during `checkout_one' until it's 0 after which
- %% it instead takes messages from the `messages' map.
+ %% replace all message payloads with their sizes (to be used for
+ %% overflow calculations).
%% This is done so that consumers are still served in a deterministic
%% order on recovery.
- prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(),
- PrefixMsgs :: non_neg_integer()},
+ prefix_msgs = {[], []} :: {Return :: [msg_size()],
+ PrefixMsgs :: [msg_size()]},
msg_bytes_enqueue = 0 :: non_neg_integer(),
msg_bytes_checkout = 0 :: non_neg_integer(),
+ max_length :: maybe(non_neg_integer()),
+ max_bytes :: maybe(non_neg_integer()),
%% whether single active consumer is on or not for this queue
consumer_strategy = default :: default | single_active,
%% waiting consumers, one is picked active consumer is cancelled or dies
@@ -251,6 +260,8 @@
dead_letter_handler => applied_mfa(),
become_leader_handler => applied_mfa(),
shadow_copy_interval => non_neg_integer(),
+ max_length => non_neg_integer(),
+ max_bytes => non_neg_integer(),
single_active_consumer_on => boolean()}.
-export_type([protocol/0,
@@ -272,12 +283,14 @@
init(#{name := Name,
queue_resource := Resource} = Conf) ->
update_config(Conf, #state{name = Name,
- queue_resource = Resource}).
+ queue_resource = Resource}).
update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
+ MaxLength = maps:get(max_length, Conf, undefined),
+ MaxBytes = maps:get(max_bytes, Conf, undefined),
ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
true ->
single_active;
@@ -287,6 +300,8 @@ update_config(Conf, State) ->
State#state{dead_letter_handler = DLH,
become_leader_handler = BLH,
shadow_copy_interval = SHI,
+ max_length = MaxLength,
+ max_bytes = MaxBytes,
consumer_strategy = ConsumerStrategy}.
zero(_) ->
@@ -294,59 +309,49 @@ zero(_) ->
% msg_ids are scoped per consumer
% ra_indexes holds all raft indexes for enqueues currently on queue
--spec apply(ra_machine:command_meta_data(), command(),
- state()) ->
- {state(), Reply :: term(), ra_machine:effects()}.
-apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq,
- msg = RawMsg}, State00) ->
- case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State00) of
- {ok, State0, Effects1} ->
- %% need to checkout before capturing the shadow copy else
- %% snapshots may not be complete
- {State, ok, Effects} = checkout(
- add_bytes_enqueue(RawMsg, State0),
- Effects1),
- append_to_master_index(RaftIdx, Effects, State);
- {duplicate, State, Effects} ->
- {State, ok, lists:reverse(Effects)}
- end;
-apply(#{index := RaftIdx},
+-spec apply(ra_machine:command_meta_data(), command(), state()) ->
+ {state(), Reply :: term(), ra_machine:effects()} |
+ {state(), Reply :: term()}.
+apply(Metadata, #enqueue{pid = From, seq = Seq,
+ msg = RawMsg}, State00) ->
+ apply_enqueue(Metadata, From, Seq, RawMsg, State00);
+apply(Meta,
#settle{msg_ids = MsgIds, consumer_id = ConsumerId},
#state{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0} ->
% need to increment metrics before completing as any snapshot
% states taken need to includ them
- complete_and_checkout(RaftIdx, MsgIds, ConsumerId,
+ complete_and_checkout(Meta, MsgIds, ConsumerId,
Con0, [], State);
_ ->
{State, ok}
end;
-apply(#{index := RaftIdx}, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
+apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
#state{consumers = Cons0} = State0) ->
case Cons0 of
#{ConsumerId := Con0} ->
Discarded = maps:with(MsgIds, Con0#consumer.checked_out),
Effects = dead_letter_effects(Discarded, State0, []),
- complete_and_checkout(RaftIdx, MsgIds, ConsumerId, Con0,
+ complete_and_checkout(Meta, MsgIds, ConsumerId, Con0,
Effects, State0);
_ ->
{State0, ok}
end;
-apply(_, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
+apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
#state{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0 = #consumer{checked_out = Checked0}} ->
Checked = maps:without(MsgIds, Checked0),
Returned = maps:with(MsgIds, Checked0),
MsgNumMsgs = maps:values(Returned),
- return(ConsumerId, MsgNumMsgs, Con0, Checked, [], State);
+ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, [], State);
_ ->
{State, ok}
end;
-apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
- drain = Drain, consumer_id = ConsumerId},
+apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
+ drain = Drain, consumer_id = ConsumerId},
#state{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
case Cons0 of
@@ -359,7 +364,7 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
ServiceQueue0),
Cons = maps:put(ConsumerId, Con1, Cons0),
{State1, ok, Effects} =
- checkout(State0#state{service_queue = ServiceQueue,
+ checkout(Meta, State0#state{service_queue = ServiceQueue,
consumers = Cons}, []),
Response = {send_credit_reply, maps:size(State1#state.messages)},
%% by this point all checkouts for the updated credit value
@@ -389,46 +394,45 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
%% credit for unknown consumer - just ignore
{State0, ok}
end;
-apply(_, #checkout{spec = {dequeue, _}},
- #state{messages = M,
- prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 ->
- %% FIXME: also check if there are returned messages
- %% TODO do we need metric visibility of empty get requests?
- {State0, {dequeue, empty}};
-apply(Meta, #checkout{spec = {dequeue, settled}, meta = ConsumerMeta,
+apply(Meta, #checkout{spec = {dequeue, Settlement},
+ meta = ConsumerMeta,
consumer_id = ConsumerId},
- State0) ->
- % TODO: this clause could probably be optimised
- State1 = update_consumer(ConsumerId, ConsumerMeta,
- {once, 1, simple_prefetch}, State0),
- % turn send msg effect into reply
- {success, _, MsgId, Msg, State2} = checkout_one(State1),
- % immediately settle
- {State, _, Effects} = apply(Meta, make_settle(ConsumerId, [MsgId]), State2),
- {State, {dequeue, {MsgId, Msg}}, Effects};
-apply(_, #checkout{spec = {dequeue, unsettled},
- meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId},
- State0) ->
- State1 = update_consumer(ConsumerId, ConsumerMeta,
- {once, 1, simple_prefetch}, State0),
- case checkout_one(State1) of
- {success, _, MsgId, Msg, S} ->
- {S, {dequeue, {MsgId, Msg}}, [{monitor, process, Pid}]};
- {inactive, S} ->
- {S, {dequeue, empty}, [{aux, inactive}]};
- S ->
- {S, {dequeue, empty}}
+ #state{consumers = Consumers} = State0) ->
+ Exists = maps:is_key(ConsumerId, Consumers),
+ case messages_ready(State0) of
+ 0 ->
+ {State0, {dequeue, empty}};
+ _ when Exists ->
+ %% a dequeue using the same consumer_id isn't possible at this point
+ {State0, {dequeue, empty}};
+ _ ->
+ State1 = update_consumer(ConsumerId, ConsumerMeta,
+ {once, 1, simple_prefetch}, State0),
+ {success, _, MsgId, Msg, State2} = checkout_one(State1),
+ case Settlement of
+ unsettled ->
+ {_, Pid} = ConsumerId,
+ {State2, {dequeue, {MsgId, Msg}},
+ [{monitor, process, Pid}]};
+ settled ->
+ %% immediately settle the checkout
+ {State, _, Effects} = apply(Meta,
+ make_settle(ConsumerId, [MsgId]),
+ State2),
+ {State, {dequeue, {MsgId, Msg}}, Effects}
+ end
end;
-apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
+apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
{State, Effects} = cancel_consumer(ConsumerId, State0, []),
% TODO: here we should really demonitor the pid but _only_ if it has no
- % other consumers or enqueuers.
- checkout(State, Effects);
-apply(_, #checkout{spec = Spec, meta = Meta,
- consumer_id = {_, Pid} = ConsumerId},
+ % other consumers or enqueuers. leaving a monitor in place isn't harmful
+ % however
+ checkout(Meta, State, Effects);
+apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
+ consumer_id = {_, Pid} = ConsumerId},
State0) ->
- State1 = update_consumer(ConsumerId, Meta, Spec, State0),
- checkout(State1, [{monitor, process, Pid}]);
+ State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0),
+ checkout(Meta, State1, [{monitor, process, Pid}]);
apply(#{index := RaftIdx}, #purge{},
#state{ra_indexes = Indexes0,
messages = Messages} = State0) ->
@@ -486,7 +490,7 @@ apply(_, {down, ConsumerPid, noconnection},
%% TODO: should we run a checkout here?
{State#state{consumers = Cons, enqueuers = Enqs,
waiting_consumers = WaitingConsumers}, ok, Effects};
-apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
+apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
@@ -506,8 +510,8 @@ apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
{State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) ->
cancel_consumer(ConsumerId, S, E)
end, {State2, Effects1}, DownConsumers),
- checkout(State, Effects);
-apply(_, {nodeup, Node}, #state{consumers = Cons0,
+ checkout(Meta, State, Effects);
+apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
enqueuers = Enqs0,
service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
@@ -535,13 +539,13 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0,
Acc
end, {Cons0, SQ0, Monitors}, Cons0),
% TODO: avoid list concat
- checkout(State0#state{consumers = Cons1, enqueuers = Enqs1,
- service_queue = SQ,
- waiting_consumers = WaitingConsumers}, Effects);
+ checkout(Meta, State0#state{consumers = Cons1, enqueuers = Enqs1,
+ service_queue = SQ,
+ waiting_consumers = WaitingConsumers}, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
-apply(_, #update_config{config = Conf}, State) ->
- {update_config(Conf, State), ok}.
+apply(Meta, #update_config{config = Conf}, State) ->
+ checkout(Meta, update_config(Conf, State), []).
handle_waiting_consumer_down(_Pid,
#state{consumer_strategy = default} = State) ->
@@ -592,7 +596,7 @@ state_enter(leader, #state{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
name = Name,
- prefix_msg_counts = {0, 0},
+ prefix_msgs = {[], []},
become_leader_handler = BLH}) ->
% return effects to monitor all current consumers and enqueuers
Pids = lists:usort(maps:keys(Enqs)
@@ -608,10 +612,10 @@ state_enter(leader, #state{consumers = Cons,
{Mod, Fun, Args} ->
[{mod_call, Mod, Fun, Args ++ [Name]} | Effects]
end;
-state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts})
- when PrefixMsgCounts =/= {0, 0} ->
+state_enter(recovered, #state{prefix_msgs = PrefixMsgCounts})
+ when PrefixMsgCounts =/= {[], []} ->
%% TODO: remove assertion?
- exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts});
+ exit({rabbit_fifo, unexpected_prefix_msgs, PrefixMsgCounts});
state_enter(eol, #state{enqueuers = Enqs,
consumers = Custs0,
waiting_consumers = WaitingConsumers0}) ->
@@ -629,14 +633,12 @@ state_enter(_, _) ->
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
tick(_Ts, #state{name = Name,
queue_resource = QName,
- messages = Messages,
- ra_indexes = Indexes,
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes} = State) ->
Metrics = {Name,
- maps:size(Messages), % Ready
+ messages_ready(State),
num_checked_out(State), % checked out
- rabbit_fifo_index:size(Indexes), %% Total
+ messages_total(State),
query_consumer_count(State), % Consumers
EnqueueBytes,
CheckoutBytes},
@@ -649,8 +651,7 @@ overview(#state{consumers = Cons,
messages = Messages,
ra_indexes = Indexes,
msg_bytes_enqueue = EnqueueBytes,
- msg_bytes_checkout = CheckoutBytes
- } = State) ->
+ msg_bytes_checkout = CheckoutBytes} = State) ->
#{type => ?MODULE,
num_consumers => maps:size(Cons),
num_checked_out => num_checked_out(State),
@@ -693,13 +694,17 @@ handle_aux(_, cast, Cmd, {Name, Use0}, Log, _) ->
%%% Queries
-query_messages_ready(#state{messages = M}) ->
- M.
+%% TODO: this doesn't take returns into account
+query_messages_ready(State) ->
+ messages_ready(State).
query_messages_checked_out(#state{consumers = Consumers}) ->
maps:fold(fun (_, #consumer{checked_out = C}, S) ->
- maps:merge(S, maps:from_list(maps:values(C)))
- end, #{}, Consumers).
+ maps:size(C) + S
+ end, 0, Consumers).
+
+query_messages_total(State) ->
+ messages_total(State).
query_processes(#state{enqueuers = Enqs, consumers = Cons0}) ->
Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0),
@@ -770,6 +775,18 @@ usage(Name) when is_atom(Name) ->
%%% Internal
+messages_ready(#state{messages = M,
+ prefix_msgs = {PreR, PreM},
+ returns = R}) ->
+
+ %% TODO: optimise to avoid length/1 call
+ maps:size(M) + lqueue:len(R) + length(PreR) + length(PreM).
+
+messages_total(#state{ra_indexes = I,
+ prefix_msgs = {PreR, PreM}}) ->
+
+ rabbit_fifo_index:size(I) + length(PreR) + length(PreM).
+
update_use({inactive, _, _, _} = CUInfo, inactive) ->
CUInfo;
update_use({active, _, _} = CUInfo, active) ->
@@ -879,6 +896,35 @@ cancel_consumer0(ConsumerId,
{S0, Effects0}
end.
+apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
+ Bytes = message_size(RawMsg),
+ case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
+ {ok, State1, Effects1} ->
+ State2 = append_to_master_index(RaftIdx,
+ add_bytes_enqueue(Bytes, State1)),
+ {State, ok, Effects} = checkout(Meta, State2, Effects1),
+ {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
+ {duplicate, State, Effects} ->
+ {State, ok, Effects}
+ end.
+
+drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) ->
+ case take_next_msg(State0) of
+ {FullMsg = {_MsgId, {RaftIdxToDrop, {_Header, Msg}}},
+ State1} ->
+ Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0),
+ Bytes = message_size(Msg),
+ State = add_bytes_drop(Bytes, State1#state{ra_indexes = Indexes}),
+ Effects = dead_letter_effects(maps:put(none, FullMsg, #{}),
+ State, Effects0),
+ {State, Effects};
+ {{'$prefix_msg', Bytes}, State1} ->
+ State = add_bytes_drop(Bytes, State1),
+ {State, Effects0};
+ empty ->
+ {State0, Effects0}
+ end.
+
enqueue(RaftIdx, RawMsg, #state{messages = Messages,
low_msg_num = LowMsgNum,
next_msg_num = NextMsgNum} = State0) ->
@@ -889,19 +935,28 @@ enqueue(RaftIdx, RawMsg, #state{messages = Messages,
low_msg_num = min(LowMsgNum, NextMsgNum),
next_msg_num = NextMsgNum + 1}.
-append_to_master_index(RaftIdx, Effects,
+append_to_master_index(RaftIdx,
#state{ra_indexes = Indexes0} = State0) ->
- {State, Shadow} = incr_enqueue_count(State0),
- Indexes = rabbit_fifo_index:append(RaftIdx, Shadow, Indexes0),
- {State#state{ra_indexes = Indexes}, ok, Effects}.
+ State = incr_enqueue_count(State0),
+ Indexes = rabbit_fifo_index:append(RaftIdx, undefined, Indexes0),
+ State#state{ra_indexes = Indexes}.
incr_enqueue_count(#state{enqueue_count = C,
shadow_copy_interval = C} = State0) ->
- % time to stash a dehydrated state version
- State = State0#state{enqueue_count = 0},
- {State, dehydrate_state(State)};
+ % this will trigger a dehydrated version of the state to be stored
+ % at this raft index for potential future snapshot generation
+ State0#state{enqueue_count = 0};
incr_enqueue_count(#state{enqueue_count = C} = State) ->
- {State#state{enqueue_count = C + 1}, undefined}.
+ State#state{enqueue_count = C + 1}.
+
+maybe_store_dehydrated_state(RaftIdx, #state{enqueue_count = 0,
+ ra_indexes = Indexes} = State) ->
+ Dehydrated = dehydrate_state(State),
+ State#state{ra_indexes =
+ rabbit_fifo_index:update_if_present(RaftIdx, Dehydrated,
+ Indexes)};
+maybe_store_dehydrated_state(_RaftIdx, State) ->
+ State.
enqueue_pending(From,
@@ -916,7 +971,8 @@ enqueue_pending(From, Enq, #state{enqueuers = Enqueuers0} = State) ->
maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) ->
% direct enqueue without tracking
- {ok, enqueue(RaftIdx, RawMsg, State0), Effects};
+ State = enqueue(RaftIdx, RawMsg, State0),
+ {ok, State, Effects};
maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
#state{enqueuers = Enqueuers0} = State0) ->
case maps:get(From, Enqueuers0, undefined) of
@@ -947,19 +1003,19 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
snd(T) ->
element(2, T).
-return(ConsumerId, MsgNumMsgs, Con0, Checked,
+return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) ->
Con = Con0#consumer{checked_out = Checked,
credit = increase_credit(Con0, length(MsgNumMsgs))},
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
- State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) ->
+ State1 = lists:foldl(fun({'$prefix_msg', _} = Msg, S0) ->
return_one(0, Msg, S0);
({MsgNum, Msg}, S0) ->
return_one(MsgNum, Msg, S0)
end, State0, MsgNumMsgs),
- checkout(State1#state{consumers = Cons,
- service_queue = SQ},
+ checkout(Meta, State1#state{consumers = Cons,
+ service_queue = SQ},
Effects).
% used to processes messages that are finished
@@ -990,7 +1046,7 @@ increase_credit(#consumer{lifetime = auto,
increase_credit(#consumer{credit = Current}, Credit) ->
Current + Credit.
-complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
+complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
#consumer{checked_out = Checked0} = Con0,
Effects0, #state{ra_indexes = Indexes0} = State0) ->
Checked = maps:without(MsgIds, Checked0),
@@ -998,15 +1054,15 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) ->
add_bytes_settle(RawMsg, Acc);
- (_, Acc) ->
- Acc
+ ({'$prefix_msg', _} = M, Acc) ->
+ add_bytes_settle(M, Acc)
end, State0, maps:values(Discarded)),
%% need to pass the length of discarded as $prefix_msgs would be filtered
%% by the above list comprehension
{State2, Effects1} = complete(ConsumerId, MsgRaftIdxs,
maps:size(Discarded),
Con0, Checked, Effects0, State1),
- {State, ok, Effects} = checkout(State2, Effects1),
+ {State, ok, Effects} = checkout(Meta, State2, Effects1),
% settle metrics are incremented separately
update_smallest_raft_index(IncomingRaftIdx, Indexes0, State, Effects).
@@ -1016,7 +1072,7 @@ dead_letter_effects(_Discarded,
Effects;
dead_letter_effects(Discarded,
#state{dead_letter_handler = {Mod, Fun, Args}}, Effects) ->
- DeadLetters = maps:fold(fun(_, {_, {_, {_, Msg}}},
+ DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}},
% MsgId, MsgIdID, RaftId, Header
Acc) -> [{rejected, Msg} | Acc]
end, [], Discarded),
@@ -1028,7 +1084,6 @@ cancel_consumer_effects(ConsumerId, #state{queue_resource = QName}, Effects) ->
update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
#state{ra_indexes = Indexes,
- % prefix_msg_count = 0,
messages = Messages} = State, Effects) ->
case rabbit_fifo_index:size(Indexes) of
0 when map_size(Messages) =:= 0 ->
@@ -1057,35 +1112,45 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
end.
% TODO update message then update messages and returns in single operations
-return_one(0, '$prefix_msg',
+return_one(0, {'$prefix_msg', _} = Msg,
#state{returns = Returns} = State0) ->
- State0#state{returns = lqueue:in('$prefix_msg', Returns)};
+ add_bytes_return(Msg,
+ State0#state{returns = lqueue:in(Msg, Returns)});
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
- #state{messages = Messages,
- returns = Returns} = State0) ->
+ #state{returns = Returns} = State0) ->
Header = maps:update_with(delivery_count,
fun (C) -> C+1 end,
1, Header0),
Msg = {RaftId, {Header, RawMsg}},
% this should not affect the release cursor in any way
add_bytes_return(RawMsg,
- State0#state{messages = maps:put(MsgNum, Msg, Messages),
- returns = lqueue:in(MsgNum, Returns)}).
+ State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}).
-return_all(State, Checked0) ->
+return_all(State0, Checked0) ->
%% need to sort the list so that we return messages in the order
%% they were checked out
Checked = lists:sort(maps:to_list(Checked0)),
- lists:foldl(fun ({_, '$prefix_msg'}, S) ->
- return_one(0, '$prefix_msg', S);
+ lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, S) ->
+ return_one(0, Msg, S);
({_, {MsgNum, Msg}}, S) ->
return_one(MsgNum, Msg, S)
- end, State, Checked).
+ end, State0, Checked).
+
+
%% checkout new messages to consumers
%% reverses the effects list
-checkout(State, Effects) ->
- checkout0(checkout_one(State), Effects, #{}).
+checkout(#{index := Index}, State0, Effects0) ->
+ {State1, _Result, Effects1} = checkout0(checkout_one(State0),
+ Effects0, #{}),
+ case evaluate_limit(State0#state.ra_indexes, false,
+ State1, Effects1) of
+ {State, true, Effects} ->
+ update_smallest_raft_index(Index, State0#state.ra_indexes,
+ State, Effects);
+ {State, false, Effects} ->
+ {State, ok, Effects}
+ end.
checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) ->
DelMsg = {MsgId, Msg},
@@ -1093,12 +1158,31 @@ checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) ->
fun (M) -> [DelMsg | M] end,
[DelMsg], Acc0),
checkout0(checkout_one(State), Effects, Acc);
-checkout0({inactive, State}, Effects0, Acc) ->
- Effects = append_send_msg_effects(Effects0, Acc),
- {State, ok, lists:reverse([{aux, inactive} | Effects])};
-checkout0(State, Effects0, Acc) ->
- Effects = append_send_msg_effects(Effects0, Acc),
- {State, ok, lists:reverse(Effects)}.
+checkout0({Activity, State0}, Effects0, Acc) ->
+ Effects1 = case Activity of
+ nochange ->
+ append_send_msg_effects(Effects0, Acc);
+ inactive ->
+ [{aux, inactive}
+ | append_send_msg_effects(Effects0, Acc)]
+ end,
+ {State0, ok, lists:reverse(Effects1)}.
+
+evaluate_limit(_OldIndexes, Result,
+ #state{max_length = undefined,
+ max_bytes = undefined} = State,
+ Effects) ->
+ {State, Result, Effects};
+evaluate_limit(OldIndexes, Result,
+ State0, Effects0) ->
+ case is_over_limit(State0) of
+ true ->
+ {State, Effects} = drop_head(State0, Effects0),
+ evaluate_limit(OldIndexes, true, State, Effects);
+ false ->
+ {State0, Result, Effects0}
+ %% TODO: optimisation: avoid calling if nothing has been dropped?
+ end.
append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 ->
Effects;
@@ -1108,57 +1192,51 @@ append_send_msg_effects(Effects0, AccMap) ->
end, Effects0, AccMap),
[{aux, active} | Effects].
-next_checkout_message(#state{prefix_msg_counts = {PReturns, P}} = State)
- when PReturns > 0 ->
+%% next message is determined as follows:
+%% First we check if there are are prefex returns
+%% Then we check if there are current returns
+%% then we check prefix msgs
+%% then we check current messages
+%%
+%% When we return it is always done to the current return queue
+%% for both prefix messages and current messages
+take_next_msg(#state{prefix_msgs = {[Bytes | Rem], P}} = State) ->
%% there are prefix returns, these should be served first
- {'$prefix_msg', State#state{prefix_msg_counts = {PReturns - 1, P}}};
-next_checkout_message(#state{returns = Returns,
- low_msg_num = Low0,
- prefix_msg_counts = {R, P},
- next_msg_num = NextMsgNum} = State) ->
+ {{'$prefix_msg', Bytes},
+ State#state{prefix_msgs = {Rem, P}}};
+take_next_msg(#state{returns = Returns,
+ low_msg_num = Low0,
+ messages = Messages0,
+ prefix_msgs = {R, P}} = State) ->
%% use peek rather than out there as the most likely case is an empty
%% queue
case lqueue:peek(Returns) of
- {value, Next} ->
- {Next, State#state{returns = lqueue:drop(Returns)}};
- empty when P == 0 ->
+ {value, NextMsg} ->
+ {NextMsg,
+ State#state{returns = lqueue:drop(Returns)}};
+ empty when P == [] ->
case Low0 of
undefined ->
- {undefined, State};
+ empty;
_ ->
- case Low0 + 1 of
- NextMsgNum ->
- %% the map will be empty after this item is removed
- {Low0, State#state{low_msg_num = undefined}};
- Low ->
- {Low0, State#state{low_msg_num = Low}}
+ %% TODO: defensive?
+ {Msg, Messages} = maps:take(Low0, Messages0),
+ case maps:size(Messages) of
+ 0 ->
+ {{Low0, Msg},
+ State#state{messages = Messages,
+ low_msg_num = undefined}};
+ _ ->
+ {{Low0, Msg},
+ State#state{messages = Messages,
+ low_msg_num = Low0 + 1}}
end
end;
empty ->
+ [Bytes | Rem] = P,
%% There are prefix msgs
- {'$prefix_msg', State#state{prefix_msg_counts = {R, P - 1}}}
- end.
-
-%% next message is determined as follows:
-%% First we check if there are are prefex returns
-%% Then we check if there are current returns
-%% then we check prefix msgs
-%% then we check current messages
-%%
-%% When we return it is always done to the current return queue
-%% for both prefix messages and current messages
-take_next_msg(#state{messages = Messages0} = State0) ->
- case next_checkout_message(State0) of
- {'$prefix_msg', State} ->
- {'$prefix_msg', State, Messages0};
- {NextMsgInId, State} ->
- %% messages are available
- case maps:take(NextMsgInId, Messages0) of
- {IdxMsg, Messages} ->
- {{NextMsgInId, IdxMsg}, State, Messages};
- error ->
- error
- end
+ {{'$prefix_msg', Bytes},
+ State#state{prefix_msgs = {R, Rem}}}
end.
send_msg_effect({CTag, CPid}, Msgs) ->
@@ -1170,7 +1248,7 @@ checkout_one(#state{service_queue = SQ0,
case queue:peek(SQ0) of
{value, ConsumerId} ->
case take_next_msg(InitState) of
- {ConsumerMsg, State0, Messages} ->
+ {ConsumerMsg, State0} ->
SQ1 = queue:drop(SQ0),
%% there are consumers waiting to be serviced
%% process consumer checkout
@@ -1195,31 +1273,31 @@ checkout_one(#state{service_queue = SQ0,
update_or_remove_sub(ConsumerId, Con,
Cons0, SQ1, []),
State1 = State0#state{service_queue = SQ,
- messages = Messages,
consumers = Cons},
{State, Msg} =
case ConsumerMsg of
- '$prefix_msg' ->
- {State1, '$prefix_msg'};
+ {'$prefix_msg', _} ->
+ {add_bytes_checkout(ConsumerMsg, State1),
+ ConsumerMsg};
{_, {_, {_, RawMsg} = M}} ->
- {add_bytes_checkout(RawMsg, State1), M}
+ {add_bytes_checkout(RawMsg, State1),
+ M}
end,
{success, ConsumerId, Next, Msg, State};
error ->
%% consumer did not exist but was queued, recurse
checkout_one(InitState#state{service_queue = SQ1})
end;
- error ->
- InitState
+ empty ->
+ {nochange, InitState}
end;
empty ->
case maps:size(Messages0) of
- 0 -> InitState;
+ 0 -> {nochange, InitState};
_ -> {inactive, InitState}
end
end.
-
update_or_remove_sub(ConsumerId, #consumer{lifetime = auto,
credit = 0} = Con,
Cons, ServiceQueue, Effects) ->
@@ -1262,14 +1340,14 @@ update_consumer(ConsumerId, Meta, Spec,
%% general case, single active consumer off
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, Spec,
- #state{consumers = Cons0,
+ #state{consumers = Cons0,
consumer_strategy = single_active} = State0)
when map_size(Cons0) == 0 ->
%% single active consumer on, no one is consuming yet
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
- #state{consumer_strategy = single_active,
- waiting_consumers = WaitingConsumers0} = State0) ->
+ #state{consumer_strategy = single_active,
+ waiting_consumers = WaitingConsumers0} = State0) ->
%% single active consumer on and one active consumer already
%% adding the new consumer to the waiting list
Consumer = #consumer{lifetime = Life, meta = Meta,
@@ -1289,11 +1367,10 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
%% the credit update
N = maps:size(S#consumer.checked_out),
C = max(0, Credit - N),
- S#consumer{lifetime = Life,
- credit = C}
+ S#consumer{lifetime = Life, credit = C}
end, Init, Cons0),
ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons),
- ServiceQueue0),
+ ServiceQueue0),
State0#state{consumers = Cons, service_queue = ServiceQueue}.
@@ -1313,13 +1390,20 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
dehydrate_state(#state{messages = Messages,
consumers = Consumers,
returns = Returns,
- prefix_msg_counts = {PrefRetCnt, MsgCount}} = State) ->
- %% TODO: optimise to avoid having to iterate the queue to get the number
- %% of current returned messages
- RetLen = lqueue:len(Returns), % O(1)
- CurReturns = length([R || R <- lqueue:to_list(Returns),
- R =/= '$prefix_msg']),
- PrefixMsgCnt = MsgCount + maps:size(Messages) - CurReturns,
+ prefix_msgs = {PrefRet0, PrefMsg0}} = State) ->
+ %% TODO: optimise this function as far as possible
+ PrefRet = lists:foldl(fun ({'$prefix_msg', Bytes}, Acc) ->
+ [Bytes | Acc];
+ ({_, {_, {_, Raw}}}, Acc) ->
+ [message_size(Raw) | Acc]
+ end,
+ lists:reverse(PrefRet0),
+ lqueue:to_list(Returns)),
+ PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_H, Raw}}}, Acc) ->
+ [message_size(Raw) | Acc]
+ end,
+ lists:reverse(PrefMsg0),
+ lists:sort(maps:to_list(Messages))),
State#state{messages = #{},
ra_indexes = rabbit_fifo_index:empty(),
low_msg_num = undefined,
@@ -1327,14 +1411,26 @@ dehydrate_state(#state{messages = Messages,
dehydrate_consumer(C)
end, Consumers),
returns = lqueue:new(),
- %% messages include returns
- prefix_msg_counts = {RetLen + PrefRetCnt,
- PrefixMsgCnt}}.
+ prefix_msgs = {lists:reverse(PrefRet),
+ lists:reverse(PrefMsgs)}}.
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
- Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0),
+ Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
+ M;
+ (_, {_, {_, {_, Raw}}}) ->
+ {'$prefix_msg', message_size(Raw)}
+ end, Checked0),
Con#consumer{checked_out = Checked}.
+is_over_limit(#state{max_length = undefined,
+ max_bytes = undefined}) ->
+ false;
+is_over_limit(#state{max_length = MaxLength,
+ max_bytes = MaxBytes,
+ msg_bytes_enqueue = BytesEnq} = State) ->
+
+ messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
+
-spec make_enqueue(maybe(pid()), maybe(msg_seqno()), raw_msg()) -> protocol().
make_enqueue(Pid, Seq, Msg) ->
#enqueue{pid = Pid, seq = Seq, msg = Msg}.
@@ -1371,10 +1467,12 @@ make_purge() -> #purge{}.
make_update_config(Config) ->
#update_config{config = Config}.
-add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) ->
- Bytes = message_size(Msg),
+add_bytes_enqueue(Bytes, #state{msg_bytes_enqueue = Enqueue} = State) ->
State#state{msg_bytes_enqueue = Enqueue + Bytes}.
+add_bytes_drop(Bytes, #state{msg_bytes_enqueue = Enqueue} = State) ->
+ State#state{msg_bytes_enqueue = Enqueue - Bytes}.
+
add_bytes_checkout(Msg, #state{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue } = State) ->
Bytes = message_size(Msg),
@@ -1394,6 +1492,8 @@ add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout,
message_size(#basic_message{content = Content}) ->
#content{payload_fragments_rev = PFR} = Content,
iolist_size(PFR);
+message_size({'$prefix_msg', B}) ->
+ B;
message_size(B) when is_binary(B) ->
byte_size(B);
message_size(Msg) ->
@@ -1554,7 +1654,7 @@ enq_enq_deq_deq_settle_test() ->
{State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} =
apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
State2),
- {_State4, {dequeue, empty}, _} =
+ {_State4, {dequeue, empty}} =
apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}),
State3),
ok.
@@ -1599,7 +1699,7 @@ release_cursor_test() ->
checkout_enq_settle_test() ->
Cid = {?FUNCTION_NAME, self()},
- {State1, [{monitor, _, _}]} = check(Cid, 1, test_init(test)),
+ {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)),
{State2, Effects0} = enq(2, 1, first, State1),
?ASSERT_EFF({send_msg, _,
{delivery, ?FUNCTION_NAME,
@@ -1614,7 +1714,7 @@ checkout_enq_settle_test() ->
out_of_order_enqueue_test() ->
Cid = {?FUNCTION_NAME, self()},
- {State1, [{monitor, _, _}]} = check_n(Cid, 5, 5, test_init(test)),
+ {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
{State2, Effects2} = enq(2, 1, first, State1),
?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
% assert monitor was set up
@@ -1644,7 +1744,7 @@ out_of_order_first_enqueue_test() ->
duplicate_enqueue_test() ->
Cid = {<<"duplicate_enqueue_test">>, self()},
- {State1, [{monitor, _, _}]} = check_n(Cid, 5, 5, test_init(test)),
+ {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
{State2, Effects2} = enq(2, 1, first, State1),
?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
{_State3, Effects3} = enq(3, 1, first, State2),
@@ -1663,10 +1763,10 @@ return_checked_out_test() ->
{State0, [_, _]} = enq(1, 1, first, test_init(test)),
{State1, [_Monitor,
{send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event},
- {aux, active}
+ {aux, active} | _
]} = check(Cid, 2, State0),
% return
- {_State2, _, [_, _]} = apply(meta(3), make_return(Cid, [MsgId]), State1),
+ {_State2, _, [_]} = apply(meta(3), make_return(Cid, [MsgId]), State1),
ok.
return_auto_checked_out_test() ->
@@ -1695,7 +1795,8 @@ cancelled_checkout_out_test() ->
{State1, _} = check_auto(Cid, 2, State0),
% cancelled checkout should return all pending messages to queue
{State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), State1),
- ?assertEqual(2, maps:size(State2#state.messages)),
+ ?assertEqual(1, maps:size(State2#state.messages)),
+ ?assertEqual(1, lqueue:len(State2#state.returns)),
{State3, {dequeue, {0, {_, first}}}, _} =
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2),
@@ -1748,14 +1849,14 @@ down_with_noconnection_returns_unack_test() ->
?assertEqual(0, maps:size(State1#state.messages)),
?assertEqual(0, lqueue:len(State1#state.returns)),
{State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
- ?assertEqual(1, maps:size(State2a#state.messages)),
+ ?assertEqual(0, maps:size(State2a#state.messages)),
?assertEqual(1, lqueue:len(State2a#state.returns)),
ok.
down_with_noproc_enqueuer_is_cleaned_up_test() ->
State00 = test_init(test),
Pid = spawn(fun() -> ok end),
- {State0, _, Effects0} = apply(meta(1), {enqueue, Pid, 1, first}, State00),
+ {State0, _, Effects0} = apply(meta(1), make_enqueue(Pid, 1, first), State00),
?ASSERT_EFF({monitor, process, _}, Effects0),
{State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0),
% ensure there are no enqueuers
@@ -2116,7 +2217,7 @@ single_active_consumer_test() ->
% adding some consumers
AddConsumer = fun(CTag, State) ->
{NewState, _, _} = apply(
- #{},
+ meta(1),
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, self()}},
@@ -2134,7 +2235,8 @@ single_active_consumer_test() ->
?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#state.waiting_consumers)),
% cancelling a waiting consumer
- {State2, _, Effects1} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1),
+ {State2, _, Effects1} = apply(meta(2),
+ #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1),
% the active consumer should still be in place
?assertEqual(1, map_size(State2#state.consumers)),
?assert(maps:is_key({<<"ctag1">>, self()}, State2#state.consumers)),
@@ -2146,7 +2248,7 @@ single_active_consumer_test() ->
?assertEqual(1, length(Effects1)),
% cancelling the active consumer
- {State3, _, Effects2} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2),
+ {State3, _, Effects2} = apply(meta(3), #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2),
% the second registered consumer is now the active one
?assertEqual(1, map_size(State3#state.consumers)),
?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)),
@@ -2157,7 +2259,7 @@ single_active_consumer_test() ->
?assertEqual(2, length(Effects2)),
% cancelling the active consumer
- {State4, _, Effects3} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3),
+ {State4, _, Effects3} = apply(meta(4), #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3),
% the last waiting consumer became the active one
?assertEqual(1, map_size(State4#state.consumers)),
?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)),
@@ -2167,7 +2269,7 @@ single_active_consumer_test() ->
?assertEqual(2, length(Effects3)),
% cancelling the last consumer
- {State5, _, Effects4} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4),
+ {State5, _, Effects4} = apply(meta(5), #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4),
% no active consumer anymore
?assertEqual(0, map_size(State5#state.consumers)),
% still nothing in the waiting list
@@ -2192,7 +2294,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
- #{},
+ #{index => 1},
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, ChannelId}},
@@ -2203,16 +2305,17 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
% the channel of the active consumer goes down
- {State2, _, Effects} = apply(#{}, {down, Pid1, doesnotmatter}, State1),
+ {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, doesnotmatter}, State1),
% fell back to another consumer
?assertEqual(1, map_size(State2#state.consumers)),
% there are still waiting consumers
?assertEqual(2, length(State2#state.waiting_consumers)),
- % effects to unregister the consumer and to update the new active one (metrics) are there
+ % effects to unregister the consumer and
+ % to update the new active one (metrics) are there
?assertEqual(2, length(Effects)),
% the channel of the active consumer and a waiting consumer goes down
- {State3, _, Effects2} = apply(#{}, {down, Pid2, doesnotmatter}, State2),
+ {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, doesnotmatter}, State2),
% fell back to another consumer
?assertEqual(1, map_size(State3#state.consumers)),
% no more waiting consumer
@@ -2221,7 +2324,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
?assertEqual(3, length(Effects2)),
% the last channel goes down
- {State4, _, Effects3} = apply(#{}, {down, Pid3, doesnotmatter}, State3),
+ {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3),
% no more consumers
?assertEqual(0, map_size(State4#state.consumers)),
?assertEqual(0, length(State4#state.waiting_consumers)),
@@ -2237,10 +2340,11 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti
shadow_copy_interval => 0,
single_active_consumer_on => true}),
+ Meta = #{index => 1},
% adding some consumers
AddConsumer = fun(CTag, State) ->
{NewState, _, _} = apply(
- #{},
+ Meta,
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, self()}},
@@ -2259,7 +2363,7 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti
end, State2#state.waiting_consumers),
% simulate node goes back up
- {State3, _, _} = apply(#{}, {nodeup, node(self())}, State2),
+ {State3, _, _} = apply(#{index => 2}, {nodeup, node(self())}, State2),
% all the waiting consumers should be un-suspected
?assertEqual(3, length(State3#state.waiting_consumers)),
@@ -2281,10 +2385,11 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test() ->
Pid2 = spawn(DummyFunction),
Pid3 = spawn(DummyFunction),
+ Meta = #{index => 1},
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
- #{},
+ Meta,
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, ChannelId}},
@@ -2310,10 +2415,11 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test() ->
Pid2 = spawn(DummyFunction),
Pid3 = spawn(DummyFunction),
+ Meta = #{index => 1},
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
- #{},
+ Meta,
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, ChannelId}},
@@ -2333,11 +2439,11 @@ query_consumers_test() ->
atom_to_binary(?FUNCTION_NAME, utf8)),
shadow_copy_interval => 0,
single_active_consumer_on => true}),
-
+ Meta = #{index => 1},
% adding some consumers
AddConsumer = fun(CTag, State) ->
{NewState, _, _} = apply(
- #{},
+ Meta,
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, self()}},
diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl
index f8f414f453..184002611e 100644
--- a/src/rabbit_fifo_index.erl
+++ b/src/rabbit_fifo_index.erl
@@ -4,12 +4,14 @@
empty/0,
fetch/2,
append/3,
+ update_if_present/3,
return/3,
delete/2,
size/1,
smallest/1,
next_key_after/2,
- map/2
+ map/2,
+ to_map/1
]).
-include_lib("ra/include/ra.hrl").
@@ -36,12 +38,22 @@ fetch(Key, #?MODULE{data = Data}) ->
-spec append(integer(), term(), state()) -> state().
append(Key, Value,
#?MODULE{data = Data,
- smallest = Smallest,
- largest = Largest} = State)
+ smallest = Smallest,
+ largest = Largest} = State)
when Key > Largest orelse Largest =:= undefined ->
State#?MODULE{data = maps:put(Key, Value, Data),
- smallest = ra_lib:default(Smallest, Key),
- largest = Key}.
+ smallest = ra_lib:default(Smallest, Key),
+ largest = Key}.
+
+-spec update_if_present(integer(), term(), state()) -> state().
+update_if_present(Key, Value, #?MODULE{data = Data} = State) ->
+ case Data of
+ #{Key := _} ->
+ State#?MODULE{data = maps:put(Key, Value, Data)};
+ _ ->
+ State
+ end.
+
-spec return(integer(), term(), state()) -> state().
return(Key, Value, #?MODULE{data = Data, smallest = Smallest} = State)
@@ -76,6 +88,10 @@ delete(Key, #?MODULE{data = Data} = State) ->
size(#?MODULE{data = Data}) ->
maps:size(Data).
+-spec to_map(state()) -> #{integer() => term()}.
+to_map(#?MODULE{data = Data}) ->
+ Data.
+
-spec smallest(state()) -> undefined | {integer(), term()}.
smallest(#?MODULE{smallest = undefined}) ->
undefined;
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 5c0d1c0070..1f3aa1f38b 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -152,10 +152,15 @@ ra_machine(Q) ->
ra_machine_config(Q = #amqqueue{name = QName,
pid = {Name, _}}) ->
+ %% take the minimum value of the policy and the queue arg if present
+ MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
+ MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
#{name => Name,
queue_resource => QName,
dead_letter_handler => dlx_mfa(Q),
become_leader_handler => {?MODULE, become_leader, [QName]},
+ max_length => MaxLength,
+ max_bytes => MaxBytes,
single_active_consumer_on => single_active_consumer_on(Q)}.
single_active_consumer_on(#amqqueue{arguments = QArguments}) ->
@@ -636,8 +641,10 @@ delete_member(#amqqueue{pid = {RaName, _}, name = QName}, Node) ->
%%----------------------------------------------------------------------------
dlx_mfa(Q) ->
- DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, fun res_arg/2, Q), Q),
- DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>, fun res_arg/2, Q),
+ DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>,
+ fun res_arg/2, Q), Q),
+ DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>,
+ fun res_arg/2, Q),
{?MODULE, dead_letter_publish, [DLX, DLXRKey, Q#amqqueue.name]}.
init_dlx(undefined, _Q) ->
@@ -829,9 +836,8 @@ qnode({_, Node}) ->
Node.
check_invalid_arguments(QueueName, Args) ->
- Keys = [<<"x-expires">>, <<"x-message-ttl">>, <<"x-max-length">>,
- <<"x-max-length-bytes">>, <<"x-max-priority">>, <<"x-overflow">>,
- <<"x-queue-mode">>],
+ Keys = [<<"x-expires">>, <<"x-message-ttl">>,
+ <<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>],
[case rabbit_misc:table_lookup(Args, Key) of
undefined -> ok;
_TypeVal -> rabbit_misc:protocol_error(
diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl
index d1158ef07a..3357e6f74b 100644
--- a/test/dynamic_qq_SUITE.erl
+++ b/test/dynamic_qq_SUITE.erl
@@ -135,9 +135,8 @@ force_delete_if_no_consensus(Config) ->
passive = true})),
%% TODO implement a force delete
BCh2 = rabbit_ct_client_helpers:open_channel(Config, B),
- ?assertExit({{shutdown,
- {connection_closing, {server_initiated_close, 541, _}}}, _},
- amqp_channel:call(BCh2, #'queue.delete'{queue = QName})),
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(BCh2, #'queue.delete'{queue = QName})),
ok.
takeover_on_failure(Config) ->
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index dcba910a6a..48dac3ca57 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -22,6 +22,7 @@
-import(quorum_queue_utils, [wait_for_messages_ready/3,
wait_for_messages_pending_ack/3,
+ wait_for_messages_total/3,
dirty_query/3,
ra_name/1]).
@@ -37,6 +38,7 @@ all() ->
groups() ->
[
{single_node, [], all_tests()},
+ {single_node, [], memory_tests()},
{unclustered, [], [
{cluster_size_2, [], [add_member]}
]},
@@ -51,6 +53,7 @@ groups() ->
delete_member_not_found,
delete_member]
++ all_tests()},
+ {cluster_size_2, [], memory_tests()},
{cluster_size_3, [], [
declare_during_node_down,
simple_confirm_availability_on_leader_change,
@@ -61,7 +64,8 @@ groups() ->
delete_declare,
metrics_cleanup_on_leadership_takeover,
metrics_cleanup_on_leader_crash,
- consume_in_minority]},
+ consume_in_minority
+ ]},
{cluster_size_5, [], [start_queue,
start_queue_concurrent,
quorum_cluster_size_3,
@@ -126,6 +130,11 @@ all_tests() ->
consume_redelivery_count,
subscribe_redelivery_count,
message_bytes_metrics,
+ queue_length_limit_drop_head
+ ].
+
+memory_tests() ->
+ [
memory_alarm_rolls_wal
].
@@ -240,7 +249,9 @@ declare_args(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
LQ = ?config(queue_name, Config),
- declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
+ declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-length">>, long, 2000},
+ {<<"x-max-length-bytes">>, long, 2000}]),
assert_queue_type(Server, LQ, quorum),
DQ = <<"classic-declare-args-q">>,
@@ -293,16 +304,6 @@ declare_invalid_args(Config) ->
declare(rabbit_ct_client_helpers:open_channel(Config, Server),
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-message-ttl">>, long, 2000}])),
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-max-length">>, long, 2000}])),
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-max-length-bytes">>, long, 2000}])),
?assertExit(
{{shutdown, {server_initiated_close, 406, _}}, _},
@@ -314,7 +315,7 @@ declare_invalid_args(Config) ->
{{shutdown, {server_initiated_close, 406, _}}, _},
declare(rabbit_ct_client_helpers:open_channel(Config, Server),
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-overflow">>, longstr, <<"drop-head">>}])),
+ {<<"x-overflow">>, longstr, <<"reject-publish">>}])),
?assertExit(
{{shutdown, {server_initiated_close, 406, _}}, _},
@@ -1422,7 +1423,7 @@ metrics_cleanup_on_leadership_takeover(Config) ->
_ -> false
end
end),
- force_leader_change(Leader, Servers, QQ),
+ force_leader_change(Servers, QQ),
wait_until(fun () ->
[] =:= rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes]) andalso
[] =:= rpc:call(Leader, ets, lookup, [queue_metrics, QRes])
@@ -2151,6 +2152,32 @@ memory_alarm_rolls_wal(Config) ->
timer:sleep(1000),
ok.
+queue_length_limit_drop_head(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-length">>, long, 1}])),
+
+ RaName = ra_name(QQ),
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = QQ},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = <<"msg1">>}),
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = QQ},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = <<"msg2">>}),
+ wait_for_consensus(QQ, Config),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages_total(Servers, RaName, 1),
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})).
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->
@@ -2201,6 +2228,9 @@ filter_queues(Expected, Got) ->
lists:member(K, Keys)
end, Got).
+publish_many(Ch, Queue, Count) ->
+ [publish(Ch, Queue) || _ <- lists:seq(1, Count)].
+
publish(Ch, Queue) ->
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = Queue},
@@ -2268,14 +2298,16 @@ wait_until(Condition, N) ->
wait_until(Condition, N - 1)
end.
-force_leader_change(Leader, Servers, Q) ->
+
+force_leader_change([Server | _] = Servers, Q) ->
RaName = ra_name(Q),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
[F1, _] = Servers -- [Leader],
ok = rpc:call(F1, ra, trigger_election, [{RaName, F1}]),
case ra:members({RaName, Leader}) of
{ok, _, {_, Leader}} ->
%% Leader has been re-elected
- force_leader_change(Leader, Servers, Q);
+ force_leader_change(Servers, Q);
{ok, _, _} ->
%% Leader has changed
ok
@@ -2297,3 +2329,8 @@ get_message_bytes(Leader, QRes) ->
_ ->
[]
end.
+
+wait_for_consensus(Name, Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ RaName = ra_name(Name),
+ {ok, _, _} = ra:members({RaName, Server}).
diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl
index a216c220e6..6b820c7b5c 100644
--- a/test/quorum_queue_utils.erl
+++ b/test/quorum_queue_utils.erl
@@ -5,6 +5,7 @@
-export([
wait_for_messages_ready/3,
wait_for_messages_pending_ack/3,
+ wait_for_messages_total/3,
dirty_query/3,
ra_name/1
]).
@@ -17,6 +18,10 @@ wait_for_messages_pending_ack(Servers, QName, Ready) ->
wait_for_messages(Servers, QName, Ready,
fun rabbit_fifo:query_messages_checked_out/1, 60).
+wait_for_messages_total(Servers, QName, Total) ->
+ wait_for_messages(Servers, QName, Total,
+ fun rabbit_fifo:query_messages_total/1, 60).
+
wait_for_messages(Servers, QName, Number, Fun, 0) ->
Msgs = dirty_query(Servers, QName, Fun),
Totals = lists:map(fun(M) when is_map(M) ->
@@ -28,8 +33,8 @@ wait_for_messages(Servers, QName, Number, Fun, 0) ->
wait_for_messages(Servers, QName, Number, Fun, N) ->
Msgs = dirty_query(Servers, QName, Fun),
ct:pal("Got messages ~p", [Msgs]),
- case lists:all(fun(M) when is_map(M) ->
- maps:size(M) == Number;
+ case lists:all(fun(C) when is_integer(C) ->
+ C == Number;
(_) ->
false
end, Msgs) of
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 0512e8161a..9f1f3a4797 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -300,6 +300,7 @@ returns_after_down(Config) ->
Self ! checkout_done
end),
receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end,
+ timer:sleep(1000),
% message should be available for dequeue
{ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
ra:stop_server(ServerId),
@@ -481,15 +482,15 @@ test_queries(Config) ->
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
{ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0),
{ok, {_, Ready}, _} = ra:local_query(ServerId,
- fun rabbit_fifo:query_messages_ready/1),
- ?assertEqual(1, maps:size(Ready)),
+ fun rabbit_fifo:query_messages_ready/1),
+ ?assertEqual(1, Ready),
ct:pal("Ready ~w~n", [Ready]),
{ok, {_, Checked}, _} = ra:local_query(ServerId,
- fun rabbit_fifo:query_messages_checked_out/1),
- ?assertEqual(1, maps:size(Checked)),
+ fun rabbit_fifo:query_messages_checked_out/1),
+ ?assertEqual(1, Checked),
ct:pal("Checked ~w~n", [Checked]),
{ok, {_, Processes}, _} = ra:local_query(ServerId,
- fun rabbit_fifo:query_processes/1),
+ fun rabbit_fifo:query_processes/1),
ct:pal("Processes ~w~n", [Processes]),
?assertEqual(2, length(Processes)),
P ! stop,
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index a8604b46af..5643da1991 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -25,7 +25,17 @@ all_tests() ->
scenario1,
scenario2,
scenario3,
- scenario4
+ scenario4,
+ scenario5,
+ scenario6,
+ scenario7,
+ scenario8,
+ scenario9,
+ scenario10,
+ scenario11,
+ scenario12,
+ scenario13,
+ scenario14
].
groups() ->
@@ -73,7 +83,7 @@ scenario1(_Config) ->
make_return(C2, [1]), %% E2 in returns E1 with C2
make_settle(C2, [2]) %% E2 with C2
],
- run_snapshot_test(?FUNCTION_NAME, Commands),
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
ok.
scenario2(_Config) ->
@@ -88,7 +98,7 @@ scenario2(_Config) ->
make_settle(C1, [0]),
make_settle(C2, [0])
],
- run_snapshot_test(?FUNCTION_NAME, Commands),
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
ok.
scenario3(_Config) ->
@@ -102,7 +112,7 @@ scenario3(_Config) ->
make_settle(C1, [1]),
make_settle(C1, [2])
],
- run_snapshot_test(?FUNCTION_NAME, Commands),
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
ok.
scenario4(_Config) ->
@@ -112,19 +122,147 @@ scenario4(_Config) ->
make_enqueue(E,1,msg),
make_settle(C1, [0])
],
- run_snapshot_test(?FUNCTION_NAME, Commands),
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
+ ok.
+
+scenario5(_Config) ->
+ C1 = {<<>>, c:pid(0,505,0)},
+ E = c:pid(0,465,9),
+ Commands = [make_enqueue(E,1,<<0>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,2,<<>>),
+ make_settle(C1,[0])],
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
+ ok.
+
+scenario6(_Config) ->
+ E = c:pid(0,465,9),
+ Commands = [make_enqueue(E,1,<<>>), %% 1 msg on queue - snap: prefix 1
+ make_enqueue(E,2,<<>>) %% 1. msg on queue - snap: prefix 1
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario7(_Config) ->
+ C1 = {<<>>, c:pid(0,208,0)},
+ E = c:pid(0,188,0),
+ Commands = [
+ make_enqueue(E,1,<<>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,2,<<>>),
+ make_enqueue(E,3,<<>>),
+ make_settle(C1,[0])],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario8(_Config) ->
+ C1 = {<<>>, c:pid(0,208,0)},
+ E = c:pid(0,188,0),
+ Commands = [
+ make_enqueue(E,1,<<>>),
+ make_enqueue(E,2,<<>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ % make_checkout(C1, cancel),
+ {down, E, noconnection},
+ make_settle(C1, [0])],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario9(_Config) ->
+ E = c:pid(0,188,0),
+ Commands = [
+ make_enqueue(E,1,<<>>),
+ make_enqueue(E,2,<<>>),
+ make_enqueue(E,3,<<>>)],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario10(_Config) ->
+ C1 = {<<>>, c:pid(0,208,0)},
+ E = c:pid(0,188,0),
+ Commands = [
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,1,<<>>),
+ make_settle(C1, [0])
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario11(_Config) ->
+ C1 = {<<>>, c:pid(0,215,0)},
+ E = c:pid(0,217,0),
+ Commands = [
+ make_enqueue(E,1,<<>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_checkout(C1, cancel),
+ make_enqueue(E,2,<<>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_settle(C1, [0]),
+ make_checkout(C1, cancel)
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 2}, Commands),
+ ok.
+
+scenario12(_Config) ->
+ E = c:pid(0,217,0),
+ Commands = [make_enqueue(E,1,<<0>>),
+ make_enqueue(E,2,<<0>>),
+ make_enqueue(E,3,<<0>>)],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_bytes => 2}, Commands),
+ ok.
+
+scenario13(_Config) ->
+ E = c:pid(0,217,0),
+ Commands = [make_enqueue(E,1,<<0>>),
+ make_enqueue(E,2,<<>>),
+ make_enqueue(E,3,<<>>),
+ make_enqueue(E,4,<<>>)
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 2}, Commands),
+ ok.
+
+scenario14(_Config) ->
+ E = c:pid(0,217,0),
+ Commands = [make_enqueue(E,1,<<0,0>>)],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_bytes => 1}, Commands),
ok.
snapshots(_Config) ->
run_proper(
fun () ->
- ?FORALL(O, ?LET(Ops, log_gen(), expand(Ops)),
- test1_prop(O))
- end, [], 1000).
-
-test1_prop(Commands) ->
- ct:pal("Commands: ~p~n", [Commands]),
- try run_snapshot_test(?FUNCTION_NAME, Commands) of
+ ?FORALL({Length, Bytes, SingleActiveConsumer},
+ frequency([{10, {0, 0, false}},
+ {5, {non_neg_integer(), non_neg_integer(),
+ boolean()}}]),
+ ?FORALL(O, ?LET(Ops, log_gen(200), expand(Ops)),
+ collect({Length, Bytes},
+ snapshots_prop(
+ config(?FUNCTION_NAME,
+ Length, Bytes,
+ SingleActiveConsumer), O))))
+ end, [], 2000).
+
+config(Name, Length, Bytes, SingleActive) ->
+ #{name => Name,
+ max_length => map_max(Length),
+ max_bytes => map_max(Bytes),
+ single_active_consumer_on => SingleActive}.
+
+map_max(0) -> undefined;
+map_max(N) -> N.
+
+snapshots_prop(Conf, Commands) ->
+ ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]),
+ try run_snapshot_test(Conf, Commands) of
_ -> true
catch
Err ->
@@ -132,10 +270,10 @@ test1_prop(Commands) ->
false
end.
-log_gen() ->
+log_gen(Size) ->
?LET(EPids, vector(2, pid_gen()),
?LET(CPids, vector(2, pid_gen()),
- resize(200,
+ resize(Size,
list(
frequency(
[{20, enqueue_gen(oneof(EPids))},
@@ -157,15 +295,17 @@ down_gen(Pid) ->
?LET(E, {down, Pid, oneof([noconnection, noproc])}, E).
enqueue_gen(Pid) ->
- ?LET(E, {enqueue, Pid, frequency([{10, enqueue},
- {1, delay}])}, E).
+ ?LET(E, {enqueue, Pid,
+ frequency([{10, enqueue},
+ {1, delay}]),
+ binary()}, E).
checkout_cancel_gen(Pid) ->
{checkout, Pid, cancel}.
checkout_gen(Pid) ->
%% pid, tag, prefetch
- ?LET(C, {checkout, {binary(), Pid}, choose(1, 10)}, C).
+ ?LET(C, {checkout, {binary(), Pid}, choose(1, 100)}, C).
-record(t, {state = rabbit_fifo:init(#{name => proper,
@@ -193,9 +333,10 @@ expand(Ops) ->
lists:reverse(Log).
-handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0,
- down = Down,
- effects = Effs} = T) ->
+handle_op({enqueue, Pid, When, Data},
+ #t{enqueuers = Enqs0,
+ down = Down,
+ effects = Effs} = T) ->
case Down of
#{Pid := noproc} ->
%% if it's a noproc then it cannot exist - can it?
@@ -204,13 +345,12 @@ handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0,
_ ->
Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0),
MsgSeq = maps:get(Pid, Enqs),
- Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, msg),
+ Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, Data),
case When of
enqueue ->
do_apply(Cmd, T#t{enqueuers = Enqs});
delay ->
%% just put the command on the effects queue
- ct:pal("delaying ~w", [Cmd]),
T#t{effects = queue:in(Cmd, Effs)}
end
end;
@@ -308,7 +448,6 @@ enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) ->
Cmd = rabbit_fifo:make_settle({CTag, P}, MsgIds),
enq_effs(Rem, queue:in(Cmd, Q));
enq_effs([_ | Rem], Q) ->
- % ct:pal("enq_effs dropping ~w~n", [E]),
enq_effs(Rem, Q).
@@ -323,29 +462,40 @@ run_proper(Fun, Args, NumTests) ->
(F, A) -> ct:pal(?LOW_IMPORTANCE, F, A)
end}])).
-run_snapshot_test(Name, Commands) ->
+run_snapshot_test(Conf, Commands) ->
%% create every incremental permuation of the commands lists
%% and run the snapshot tests against that
[begin
% ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]),
- run_snapshot_test0(Name, C)
+ run_snapshot_test0(Conf, C)
end || C <- prefixes(Commands, 1, [])].
-run_snapshot_test0(Name, Commands) ->
+run_snapshot_test0(Conf, Commands) ->
Indexes = lists:seq(1, length(Commands)),
Entries = lists:zip(Indexes, Commands),
- {State, Effects} = run_log(test_init(Name), Entries),
+ {State, Effects} = run_log(test_init(Conf), Entries),
+ % ct:pal("beginning snapshot test run for ~w numn commands ~b",
+ % [maps:get(name, Conf), length(Commands)]),
[begin
+ %% drop all entries below and including the snapshot
Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
end, Entries),
{S, _} = run_log(SnapState, Filtered),
% assert log can be restored from any release cursor index
- ?assertEqual(State, S)
+ case S of
+ State -> ok;
+ _ ->
+ ct:pal("Snapshot tests failed run log:~n"
+ "~p~n from ~n~p~n Entries~n~p~n",
+ [Filtered, SnapState, Entries]),
+ ?assertEqual(State, S)
+ end
end || {release_cursor, SnapIdx, SnapState} <- Effects],
ok.
+%% transforms [1,2,3] into [[1,2,3], [1,2], [1]]
prefixes(Source, N, Acc) when N > length(Source) ->
lists:reverse(Acc);
prefixes(Source, N, Acc) ->
@@ -364,11 +514,12 @@ run_log(InitState, Entries) ->
end
end, {InitState, []}, Entries).
-test_init(Name) ->
- rabbit_fifo:init(#{name => Name,
- queue_resource => blah,
- shadow_copy_interval => 0,
- metrics_handler => {?MODULE, metrics_handler, []}}).
+test_init(Conf) ->
+ Default = #{queue_resource => blah,
+ shadow_copy_interval => 0,
+ metrics_handler => {?MODULE, metrics_handler, []}},
+ rabbit_fifo:init(maps:merge(Default, Conf)).
+
meta(Idx) ->
#{index => Idx, term => 1}.
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index 0343e7d136..581440d179 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -35,9 +35,15 @@ all() ->
].
groups() ->
- MaxLengthTests = [max_length_drop_head,
+ MaxLengthTests = [max_length_default,
+ max_length_bytes_default,
+ max_length_drop_head,
+ max_length_bytes_drop_head,
max_length_reject_confirm,
- max_length_drop_publish],
+ max_length_bytes_reject_confirm,
+ max_length_drop_publish,
+ max_length_drop_publish_requeue,
+ max_length_bytes_drop_publish],
[
{parallel_tests, [parallel], [
amqp_connection_refusal,
@@ -59,11 +65,16 @@ groups() ->
set_disk_free_limit_command,
set_vm_memory_high_watermark_command,
topic_matching,
+ max_message_size,
+
{queue_max_length, [], [
- {max_length_simple, [], MaxLengthTests},
- {max_length_mirrored, [], MaxLengthTests}]},
- max_message_size
- ]}
+ {max_length_classic, [], MaxLengthTests},
+ {max_length_quorum, [], [max_length_default,
+ max_length_bytes_default]
+ },
+ {max_length_mirrored, [], MaxLengthTests}
+ ]}
+ ]}
].
suite() ->
@@ -82,10 +93,23 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
+init_per_group(max_length_classic, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, false}]);
+init_per_group(max_length_quorum, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
init_per_group(max_length_mirrored, Config) ->
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
<<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
- Config1 = rabbit_ct_helpers:set_config(Config, [{is_mirrored, true}]),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [{is_mirrored, true},
+ {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, false}]),
rabbit_ct_helpers:run_steps(Config1, []);
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
@@ -132,29 +156,22 @@ end_per_group(Group, Config) ->
end.
init_per_testcase(Testcase, Config) ->
- rabbit_ct_helpers:testcase_started(Config, Testcase).
-
-end_per_testcase(max_length_drop_head = Testcase, Config) ->
+ Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
+ Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q}]),
+ rabbit_ct_helpers:testcase_started(Config1, Testcase).
+
+end_per_testcase(Testcase, Config)
+ when Testcase == max_length_drop_publish; Testcase == max_length_bytes_drop_publish;
+ Testcase == max_length_drop_publish_requeue;
+ Testcase == max_length_reject_confirm; Testcase == max_length_bytes_reject_confirm;
+ Testcase == max_length_drop_head; Testcase == max_length_bytes_drop_head;
+ Testcase == max_length_default; Testcase == max_length_bytes_default ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_head_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_default_drop_head_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_drop_head_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_default_drop_head_queue">>}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
rabbit_ct_helpers:testcase_finished(Config, Testcase);
-end_per_testcase(max_length_reject_confirm = Testcase, Config) ->
- {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_reject_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_reject_queue">>}),
- rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
- rabbit_ct_helpers:testcase_finished(Config, Testcase);
-end_per_testcase(max_length_drop_publish = Testcase, Config) ->
- {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_publish_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_drop_publish_queue">>}),
- rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
- rabbit_ct_helpers:testcase_finished(Config, Testcase);
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
@@ -1159,43 +1176,66 @@ set_vm_memory_high_watermark_command1(_Config) ->
)
end.
-max_length_drop_head(Config) ->
+max_length_bytes_drop_head(Config) ->
+ max_length_bytes_drop_head(Config, [{<<"x-overflow">>, longstr, <<"drop-head">>}]).
+
+max_length_bytes_default(Config) ->
+ max_length_bytes_drop_head(Config, []).
+
+max_length_bytes_drop_head(Config, ExtraArgs) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- QName = <<"max_length_drop_head_queue">>,
- QNameDefault = <<"max_length_default_drop_head_queue">>,
- QNameBytes = <<"max_length_bytes_drop_head_queue">>,
- QNameDefaultBytes = <<"max_length_bytes_default_drop_head_queue">>,
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
- MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
- OverflowArgs = [{<<"x-overflow">>, longstr, <<"drop-head">>}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefault, arguments = MaxLengthArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefaultBytes, arguments = MaxLengthBytesArgs}),
-
- check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
- check_max_length_drops_head(Config, QNameDefault, Ch, <<"1">>, <<"2">>, <<"3">>),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthBytesArgs ++ Args ++ ExtraArgs, durable = Durable}),
%% 80 bytes payload
Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>,
Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>,
- check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3),
- check_max_length_drops_head(Config, QNameDefault, Ch, Payload1, Payload2, Payload3).
+ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3).
+
+max_length_drop_head(Config) ->
+ max_length_drop_head(Config, [{<<"x-overflow">>, longstr, <<"drop-head">>}]).
+
+max_length_default(Config) ->
+ %% Defaults to drop_head
+ max_length_drop_head(Config, []).
+
+max_length_drop_head(Config, ExtraArgs) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+
+ MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ Args ++ ExtraArgs, durable = Durable}),
+
+ check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>).
max_length_reject_confirm(Config) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- QName = <<"max_length_reject_queue">>,
- QNameBytes = <<"max_length_bytes_reject_queue">>,
+ Args = ?config(queue_args, Config),
+ QName = ?config(queue_name, Config),
+ Durable = ?config(queue_durable, Config),
MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
- MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
- check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
+ check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>).
+
+max_length_bytes_reject_confirm(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ QNameBytes = ?config(queue_name, Config),
+ Durable = ?config(queue_durable, Config),
+ MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs ++ Args, durable = Durable}),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
%% 80 bytes payload
Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
@@ -1207,15 +1247,55 @@ max_length_reject_confirm(Config) ->
max_length_drop_publish(Config) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- QName = <<"max_length_drop_publish_queue">>,
- QNameBytes = <<"max_length_bytes_drop_publish_queue">>,
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
- MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}),
%% If confirms are not enable, publishes will still be dropped in reject-publish mode.
- check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
+ check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>).
+
+max_length_drop_publish_requeue(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+ MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}),
+ %% If confirms are not enable, publishes will still be dropped in reject-publish mode.
+ check_max_length_requeue(Config, QName, Ch, <<"1">>, <<"2">>).
+
+check_max_length_requeue(Config, QName, Ch, Payload1, Payload2) ->
+ sync_mirrors(QName, Config),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ %% A single message is published and consumed
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
+ {#'basic.get_ok'{delivery_tag = DeliveryTag},
+ #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Another message is published
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ wait_for_consensus(QName, Config),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+ wait_for_consensus(QName, Config),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+
+max_length_bytes_drop_publish(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QNameBytes = ?config(queue_name, Config),
+ MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs ++ Args, durable = Durable}),
%% 80 bytes payload
Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
@@ -1229,22 +1309,38 @@ check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3)
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% A single message is published and consumed
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% Message 2 is dropped, message 1 stays
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% Messages 2 and 3 are dropped, message 1 stays
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ wait_for_consensus(QName, Config),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+wait_for_consensus(QName, Config) ->
+ case lists:keyfind(<<"x-queue-type">>, 1, ?config(queue_args, Config)) of
+ {_, _, <<"quorum">>} ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ RaName = binary_to_atom(<<"%2F_", QName/binary>>, utf8),
+ {ok, _, _} = ra:members({RaName, Server});
+ _ ->
+ ok
+ end.
+
check_max_length_rejects(Config, QName, Ch, Payload1, Payload2, Payload3) ->
sync_mirrors(QName, Config),
amqp_channel:register_confirm_handler(Ch, self()),
@@ -1283,12 +1379,14 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) ->
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% A single message is published and consumed
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% Message 1 is replaced by message 2
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
@@ -1296,6 +1394,7 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) ->
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload3}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).