summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-07-10 10:20:25 +0100
committerkjnilsson <knilsson@pivotal.io>2020-09-07 09:42:10 +0100
commitc7fcee5a1f317be3f589fa739b9dbf1b6f7253ce (patch)
tree61e6ddfc2504bbc0b9f5caeb7a656c68d593d076
parent6c7970e674772457f4f3cd8c66ca21b9ac5102a3 (diff)
downloadrabbitmq-server-git-c7fcee5a1f317be3f589fa739b9dbf1b6f7253ce.tar.gz
Implement reject_publish for QQs
The reject publish overflow strategy for quorum queues is an inexact implementation that relies on the cooperation of publishing channels. When a channel first wants to publish to a quorum queue it first issues a synchonous register_enqueuer command which will return the current queue overflow state as reject_publish if the queue is full. The queue will also notify any active enqueuers when it reaches the limit but will continue to accept any enqueues it receives after that. Once the queue size goes below 80% of the limit(s) the queue will again notify enqueuers that they can resume publishin inte the queue.
-rw-r--r--src/rabbit_fifo.erl209
-rw-r--r--src/rabbit_fifo.hrl12
-rw-r--r--src/rabbit_fifo_client.erl133
-rw-r--r--src/rabbit_quorum_queue.erl58
-rw-r--r--test/quorum_queue_SUITE.erl54
-rw-r--r--test/rabbit_fifo_SUITE.erl117
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl21
7 files changed, 461 insertions, 143 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 9ce39f5a64..9a2e3f3dc3 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -50,6 +50,7 @@
%% protocol helpers
make_enqueue/3,
+ make_register_enqueuer/1,
make_checkout/3,
make_settle/2,
make_return/2,
@@ -64,6 +65,7 @@
-record(enqueue, {pid :: option(pid()),
seq :: option(msg_seqno()),
msg :: raw_msg()}).
+-record(register_enqueuer, {pid :: pid()}).
-record(checkout, {consumer_id :: consumer_id(),
spec :: checkout_spec(),
meta :: consumer_meta()}).
@@ -83,6 +85,7 @@
-opaque protocol() ::
#enqueue{} |
+ #register_enqueuer{} |
#checkout{} |
#settle{} |
#return{} |
@@ -164,9 +167,27 @@ zero(_) ->
-spec apply(ra_machine:command_meta_data(), command(), state()) ->
{state(), Reply :: term(), ra_machine:effects()} |
{state(), Reply :: term()}.
-apply(Metadata, #enqueue{pid = From, seq = Seq,
- msg = RawMsg}, State00) ->
- apply_enqueue(Metadata, From, Seq, RawMsg, State00);
+apply(Meta, #enqueue{pid = From, seq = Seq,
+ msg = RawMsg}, State00) ->
+ apply_enqueue(Meta, From, Seq, RawMsg, State00);
+apply(_Meta, #register_enqueuer{pid = Pid},
+ #?MODULE{enqueuers = Enqueuers0,
+ cfg = #cfg{overflow_strategy = Overflow}} = State0) ->
+
+ State = case maps:is_key(Pid, Enqueuers0) of
+ true ->
+ %% if the enqueuer exits just echo the overflow state
+ State0;
+ false ->
+ State0#?MODULE{enqueuers = Enqueuers0#{Pid => #enqueuer{}}}
+ end,
+ Res = case is_over_limit(State) of
+ true when Overflow == reject_publish ->
+ reject_publish;
+ _ ->
+ ok
+ end,
+ {State, Res, [{monitor, process, Pid}]};
apply(Meta,
#settle{msg_ids = MsgIds, consumer_id = ConsumerId},
#?MODULE{consumers = Cons0} = State) ->
@@ -215,8 +236,9 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
ServiceQueue0),
Cons = maps:put(ConsumerId, Con1, Cons0),
{State1, ok, Effects} =
- checkout(Meta, State0#?MODULE{service_queue = ServiceQueue,
- consumers = Cons}, []),
+ checkout(Meta, State0,
+ State0#?MODULE{service_queue = ServiceQueue,
+ consumers = Cons}, []),
Response = {send_credit_reply, messages_ready(State1)},
%% by this point all checkouts for the updated credit value
%% should be processed so we can evaluate the drain
@@ -262,7 +284,8 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
apply(_, #checkout{spec = {dequeue, _}},
#?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) ->
{State0, {error, unsupported}};
-apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement},
+apply(#{index := Index,
+ from := From} = Meta, #checkout{spec = {dequeue, Settlement},
meta = ConsumerMeta,
consumer_id = ConsumerId},
#?MODULE{consumers = Consumers} = State0) ->
@@ -278,57 +301,69 @@ apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement},
{once, 1, simple_prefetch},
State0),
{success, _, MsgId, Msg, State2} = checkout_one(State1),
- {State, Effects} = case Settlement of
- unsettled ->
- {_, Pid} = ConsumerId,
- {State2, [{monitor, process, Pid}]};
- settled ->
- %% immediately settle the checkout
- {State3, _, Effects0} =
- apply(Meta, make_settle(ConsumerId, [MsgId]),
- State2),
- {State3, Effects0}
- end,
+ {State4, Effects1} = case Settlement of
+ unsettled ->
+ {_, Pid} = ConsumerId,
+ {State2, [{monitor, process, Pid}]};
+ settled ->
+ %% immediately settle the checkout
+ {State3, _, Effects0} =
+ apply(Meta, make_settle(ConsumerId, [MsgId]),
+ State2),
+ {State3, Effects0}
+ end,
+ {Reply, Effects2} =
case Msg of
{RaftIdx, {Header, 'empty'}} ->
%% TODO add here new log effect with reply
- {State, '$ra_no_reply',
- reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From)};
+ {'$ra_no_reply',
+ [reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From) |
+ Effects1]};
_ ->
- {State, {dequeue, {MsgId, Msg}, Ready-1}, Effects}
+ {{dequeue, {MsgId, Msg}, Ready-1}, Effects1}
+
+ end,
+
+ case evaluate_limit(Index, false, State0, State4, Effects2) of
+ {State, true, Effects} ->
+ update_smallest_raft_index(Index, Reply, State, Effects);
+ {State, false, Effects} ->
+ {State, Reply, Effects}
end
end;
apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
{State, Effects} = cancel_consumer(ConsumerId, State0, [], consumer_cancel),
- checkout(Meta, State, Effects);
+ checkout(Meta, State0, State, Effects);
apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
consumer_id = {_, Pid} = ConsumerId},
State0) ->
State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0),
- checkout(Meta, State1, [{monitor, process, Pid}]);
-apply(#{index := RaftIdx}, #purge{},
+ checkout(Meta, State0, State1, [{monitor, process, Pid}]);
+apply(#{index := Index}, #purge{},
#?MODULE{ra_indexes = Indexes0,
returns = Returns,
messages = Messages} = State0) ->
Total = messages_ready(State0),
Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
- [I || {_, {I, _}} <- lqueue:to_list(Messages)]),
+ [I || {_, {I, _}} <- lqueue:to_list(Messages)]),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1,
[I || {_, {I, _}} <- lqueue:to_list(Returns)]),
- {State, _, Effects} =
- update_smallest_raft_index(RaftIdx,
- State0#?MODULE{ra_indexes = Indexes,
- messages = lqueue:new(),
- returns = lqueue:new(),
- msg_bytes_enqueue = 0,
- prefix_msgs = {0, [], 0, []},
- msg_bytes_in_memory = 0,
- msgs_ready_in_memory = 0},
- []),
- %% as we're not checking out after a purge (no point) we have to
- %% reverse the effects ourselves
- {State, {purge, Total},
- lists:reverse([garbage_collection | Effects])};
+
+ State1 = State0#?MODULE{ra_indexes = Indexes,
+ messages = lqueue:new(),
+ returns = lqueue:new(),
+ msg_bytes_enqueue = 0,
+ prefix_msgs = {0, [], 0, []},
+ msg_bytes_in_memory = 0,
+ msgs_ready_in_memory = 0},
+ Effects0 = [garbage_collection],
+ Reply = {purge, Total},
+ case evaluate_limit(Index, false, State0, State1, Effects0) of
+ {State, true, Effects} ->
+ update_smallest_raft_index(Index, Reply, State, Effects);
+ {State, false, Effects} ->
+ {State, Reply, Effects}
+ end;
apply(Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
@@ -375,7 +410,7 @@ apply(Meta, {down, Pid, noconnection},
(_, E) -> E
end, Enqs0),
Effects = [{monitor, node, Node} | Effects1],
- checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects);
+ checkout(Meta, State0, State#?MODULE{enqueuers = Enqs}, Effects);
apply(Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
@@ -418,10 +453,10 @@ apply(Meta, {down, Pid, noconnection},
_ ->
[{monitor, node, Node}]
end ++ Effects1,
- checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects);
+ checkout(Meta, State0, State#?MODULE{enqueuers = Enqs}, Effects);
apply(Meta, {down, Pid, _Info}, State0) ->
{State, Effects} = handle_down(Pid, State0),
- checkout(Meta, State, Effects);
+ checkout(Meta, State0, State, Effects);
apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
enqueuers = Enqs0,
service_queue = SQ0} = State0) ->
@@ -456,7 +491,7 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
service_queue = SQ,
waiting_consumers = Waiting},
{State, Effects} = activate_next_consumer(State1, Effects1),
- checkout(Meta, State, Effects);
+ checkout(Meta, State0, State, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
apply(_, #purge_nodes{nodes = Nodes}, State0) ->
@@ -465,7 +500,7 @@ apply(_, #purge_nodes{nodes = Nodes}, State0) ->
end, {State0, []}, Nodes),
{State, ok, Effects};
apply(Meta, #update_config{config = Conf}, State) ->
- checkout(Meta, update_config(Conf, State), []);
+ checkout(Meta, State, update_config(Conf, State), []);
apply(_Meta, {machine_version, 0, 1}, V0State) ->
State = convert_v0_to_v1(V0State),
{State, ok, []}.
@@ -1011,7 +1046,7 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
{ok, State1, Effects1} ->
State2 = append_to_master_index(RaftIdx, State1),
- {State, ok, Effects} = checkout(Meta, State2, Effects1),
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
{maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
{duplicate, State, Effects} ->
{State, ok, Effects}
@@ -1173,7 +1208,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
_ ->
{State1, Effects1}
end,
- {State, ok, Effects} = checkout(Meta, State2, Effects3),
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects3),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
% used to processes messages that are finished
@@ -1221,7 +1256,7 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
Discarded = maps:with(MsgIds, Checked0),
{State2, Effects1} = complete(ConsumerId, Discarded, Con0,
Effects0, State0),
- {State, ok, Effects} = checkout(Meta, State2, Effects1),
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
dead_letter_effects(_Reason, _Discarded,
@@ -1257,7 +1292,10 @@ cancel_consumer_effects(ConsumerId,
[{mod_call, rabbit_quorum_queue,
cancel_consumer_handler, [QName, ConsumerId]} | Effects].
-update_smallest_raft_index(IncomingRaftIdx,
+update_smallest_raft_index(Idx, State, Effects) ->
+ update_smallest_raft_index(Idx, ok, State, Effects).
+
+update_smallest_raft_index(IncomingRaftIdx, Reply,
#?MODULE{ra_indexes = Indexes,
release_cursors = Cursors0} = State0,
Effects) ->
@@ -1267,17 +1305,16 @@ update_smallest_raft_index(IncomingRaftIdx,
% we can forward release_cursor all the way until
% the last received command, hooray
State = State0#?MODULE{release_cursors = lqueue:new()},
- {State, ok, Effects ++ [{release_cursor, IncomingRaftIdx, State}]};
+ {State, Reply, Effects ++ [{release_cursor, IncomingRaftIdx, State}]};
_ ->
Smallest = rabbit_fifo_index:smallest(Indexes),
case find_next_cursor(Smallest, Cursors0) of
{empty, Cursors} ->
- {State0#?MODULE{release_cursors = Cursors},
- ok, Effects};
+ {State0#?MODULE{release_cursors = Cursors}, Reply, Effects};
{Cursor, Cursors} ->
%% we can emit a release cursor when we've passed the smallest
%% release cursor available.
- {State0#?MODULE{release_cursors = Cursors}, ok,
+ {State0#?MODULE{release_cursors = Cursors}, Reply,
Effects ++ [Cursor]}
end
end.
@@ -1382,10 +1419,10 @@ return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
end, {State, Effects0}, Checked).
%% checkout new messages to consumers
-checkout(#{index := Index}, State0, Effects0) ->
+checkout(#{index := Index}, OldState, State0, Effects0) ->
{State1, _Result, Effects1} = checkout0(checkout_one(State0),
Effects0, {#{}, #{}}),
- case evaluate_limit(false, State1, Effects1) of
+ case evaluate_limit(Index, false, OldState, State1, Effects1) of
{State, true, Effects} ->
update_smallest_raft_index(Index, State, Effects);
{State, false, Effects} ->
@@ -1418,16 +1455,54 @@ checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
end,
{State0, ok, lists:reverse(Effects1)}.
-evaluate_limit(Result,
+evaluate_limit(_Index, Result, _BeforeState,
#?MODULE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}} = State,
Effects) ->
{State, Result, Effects};
-evaluate_limit(Result, State0, Effects0) ->
+evaluate_limit(Index, Result, BeforeState,
+ #?MODULE{cfg = #cfg{overflow_strategy = Strategy},
+ enqueuers = Enqs0} = State0,
+ Effects0) ->
case is_over_limit(State0) of
- true ->
+ true when Strategy == drop_head ->
{State, Effects} = drop_head(State0, Effects0),
- evaluate_limit(true, State, Effects);
+ evaluate_limit(Index, true, BeforeState, State, Effects);
+ true when Strategy == reject_publish ->
+ %% generate send_msg effect for each enqueuer to let them know
+ %% they need to block
+ {Enqs, Effects} =
+ maps:fold(
+ fun (P, #enqueuer{blocked = undefined} = E0, {Enqs, Acc}) ->
+ E = E0#enqueuer{blocked = Index},
+ {Enqs#{P => E},
+ [{send_msg, P, {queue_status, reject_publish},
+ [ra_event]} | Acc]};
+ (_P, _E, Acc) ->
+ Acc
+ end, {Enqs0, Effects0}, Enqs0),
+ {State0#?MODULE{enqueuers = Enqs}, Result, Effects};
+ false when Strategy == reject_publish ->
+ %% TODO: optimise as this case gets called for every command
+ %% pretty much
+ Before = is_below_soft_limit(BeforeState),
+ case {Before, is_below_soft_limit(State0)} of
+ {false, true} ->
+ %% we have moved below the lower limit which
+ {Enqs, Effects} =
+ maps:fold(
+ fun (P, #enqueuer{} = E0, {Enqs, Acc}) ->
+ E = E0#enqueuer{blocked = undefined},
+ {Enqs#{P => E},
+ [{send_msg, P, {queue_status, go}, [ra_event]}
+ | Acc]};
+ (_P, _E, Acc) ->
+ Acc
+ end, {Enqs0, Effects0}, Enqs0),
+ {State0#?MODULE{enqueuers = Enqs}, Result, Effects};
+ _ ->
+ {State0, Result, Effects0}
+ end;
false ->
{State0, Result, Effects0}
end.
@@ -1753,12 +1828,30 @@ is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined,
is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
max_bytes = MaxBytes},
msg_bytes_enqueue = BytesEnq} = State) ->
-
messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
+is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = undefined,
+ max_bytes = undefined}}) ->
+ false;
+is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
+ max_bytes = MaxBytes},
+ msg_bytes_enqueue = BytesEnq} = State) ->
+ is_below(MaxLength, messages_ready(State)) andalso
+ is_below(MaxBytes, BytesEnq).
+
+is_below(undefined, _Num) ->
+ true;
+is_below(Val, Num) when is_integer(Val) andalso is_integer(Num) ->
+ Num =< trunc(Val * ?LOW_LIMIT).
+
-spec make_enqueue(option(pid()), option(msg_seqno()), raw_msg()) -> protocol().
make_enqueue(Pid, Seq, Msg) ->
#enqueue{pid = Pid, seq = Seq, msg = Msg}.
+
+-spec make_register_enqueuer(pid()) -> protocol().
+make_register_enqueuer(Pid) ->
+ #register_enqueuer{pid = Pid}.
+
-spec make_checkout(consumer_id(),
checkout_spec(), consumer_meta()) -> protocol().
make_checkout(ConsumerId, Spec, Meta) ->
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index e337540c35..dc01b6ec1c 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -75,6 +75,7 @@
-define(GC_MEM_LIMIT_B, 2000000).
-define(MB, 1048576).
+-define(LOW_LIMIT, 0.8).
-record(consumer,
{meta = #{} :: consumer_meta(),
@@ -105,11 +106,11 @@
% out of order enqueues - sorted list
pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}],
status = up :: up |
- suspected_down |
- %% it is useful to have a record of when this was blocked
- %% so that we can retry sending the block effect if
- %% the publisher did not receive the initial one
- {blocked, At :: ra:index()}
+ suspected_down,
+ %% it is useful to have a record of when this was blocked
+ %% so that we can retry sending the block effect if
+ %% the publisher did not receive the initial one
+ blocked :: undefined | ra:index()
}).
-record(cfg,
@@ -191,5 +192,6 @@
max_bytes => non_neg_integer(),
max_in_memory_length => non_neg_integer(),
max_in_memory_bytes => non_neg_integer(),
+ overflow_strategy => drop_head | reject_publish,
single_active_consumer_on => boolean(),
delivery_limit => non_neg_integer()}.
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 2ac9f6787a..a698f93e9e 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -53,9 +53,17 @@
-record(consumer, {last_msg_id :: seq() | -1,
delivery_count = 0 :: non_neg_integer()}).
--record(state, {cluster_name :: cluster_name(),
- servers = [] :: [ra:server_id()],
+-record(cfg, {cluster_name :: cluster_name(),
+ servers = [] :: [ra:server_id()],
+ soft_limit = ?SOFT_LIMIT :: non_neg_integer(),
+ block_handler = fun() -> ok end :: fun(() -> term()),
+ unblock_handler = fun() -> ok end :: fun(() -> ok),
+ timeout :: non_neg_integer(),
+ version = 0 :: non_neg_integer()}).
+
+-record(state, {cfg :: #cfg{},
leader :: undefined | ra:server_id(),
+ queue_status :: undefined | go | reject_publish,
next_seq = 0 :: seq(),
%% Last applied is initialise to -1 to note that no command has yet been
%% applied, but allowing to resend messages if the first ones on the sequence
@@ -66,15 +74,11 @@
slow = false :: boolean(),
unsent_commands = #{} :: #{rabbit_fifo:consumer_id() =>
{[seq()], [seq()], [seq()]}},
- soft_limit = ?SOFT_LIMIT :: non_neg_integer(),
pending = #{} :: #{seq() =>
{term(), rabbit_fifo:command()}},
consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() =>
#consumer{}},
- block_handler = fun() -> ok end :: fun(() -> term()),
- unblock_handler = fun() -> ok end :: fun(() -> ok),
- timer_state :: term(),
- timeout :: non_neg_integer()
+ timer_state :: term()
}).
-opaque state() :: #state{}.
@@ -103,21 +107,22 @@ init(ClusterName, Servers) ->
-spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit) ->
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
- #state{cluster_name = ClusterName,
- servers = Servers,
- soft_limit = SoftLimit,
- timeout = Timeout}.
+ #state{cfg = #cfg{cluster_name = ClusterName,
+ servers = Servers,
+ soft_limit = SoftLimit,
+ timeout = Timeout}}.
-spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok),
fun(() -> ok)) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
- #state{cluster_name = ClusterName,
- servers = Servers,
- block_handler = BlockFun,
- unblock_handler = UnblockFun,
- soft_limit = SoftLimit,
- timeout = Timeout}.
+ #state{cfg = #cfg{cluster_name = ClusterName,
+ servers = Servers,
+ block_handler = BlockFun,
+ unblock_handler = UnblockFun,
+ soft_limit = SoftLimit,
+ timeout = Timeout}}.
+
%% @doc Enqueues a message.
%% @param Correlation an arbitrary erlang term used to correlate this
@@ -132,10 +137,41 @@ init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
%% SequenceNumber can be correlated to the applied sequence numbers returned
%% by the {@link handle_ra_event/2. handle_ra_event/2} function.
-spec enqueue(Correlation :: term(), Msg :: term(), State :: state()) ->
- {ok | slow, state()}.
-enqueue(Correlation, Msg, State0 = #state{slow = Slow,
- block_handler = BlockFun}) ->
- Node = pick_node(State0),
+ {ok | slow | reject_publish, state()}.
+enqueue(Correlation, Msg,
+ #state{queue_status = undefined,
+ next_enqueue_seq = 1,
+ cfg = #cfg{}} = State0) ->
+ %% it is the first enqueue, check the version
+ {_, Node} = Server = pick_server(State0),
+ State =
+ case rpc:call(Node, rabbit_fifo, version, []) of
+ 0 ->
+ %% the leader is running the old version
+ %% so we can't initialize the enqueuer session safely
+ State0#state{queue_status = go};
+ 1 ->
+ %% were running the new version on the leader do sync initialisation
+ %% of enqueuer session
+ Reg = rabbit_fifo:make_register_enqueuer(self()),
+ case ra:process_command(Server, Reg) of
+ {ok, reject_publish, _} ->
+ State0#state{queue_status = reject_publish};
+ {ok, ok, _} ->
+ State0#state{queue_status = go};
+ Err ->
+ exit(Err)
+ end
+ end,
+ enqueue(Correlation, Msg, State);
+enqueue(_Correlation, _Msg,
+ #state{queue_status = reject_publish,
+ cfg = #cfg{}} = State) ->
+ {reject_publish, State};
+enqueue(Correlation, Msg,
+ #state{slow = Slow,
+ cfg = #cfg{block_handler = BlockFun}} = State0) ->
+ Node = pick_server(State0),
{Next, State1} = next_enqueue_seq(State0),
% by default there is no correlation id
Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg),
@@ -159,7 +195,7 @@ enqueue(Correlation, Msg, State0 = #state{slow = Slow,
%% by the {@link handle_ra_event/2. handle_ra_event/2} function.
%%
-spec enqueue(Msg :: term(), State :: state()) ->
- {ok | slow, state()}.
+ {ok | slow | reject_publish, state()}.
enqueue(Msg, State) ->
enqueue(undefined, Msg, State).
@@ -178,8 +214,9 @@ enqueue(Msg, State) ->
Settlement :: settled | unsettled, state()) ->
{ok, {rabbit_fifo:delivery_msg(), non_neg_integer()}
| empty, state()} | {error | timeout, term()}.
-dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
- Node = pick_node(State0),
+dequeue(ConsumerTag, Settlement,
+ #state{cfg = #cfg{timeout = Timeout}} = State0) ->
+ Node = pick_server(State0),
ConsumerId = consumer_id(ConsumerTag),
case ra:process_command(Node,
rabbit_fifo:make_checkout(ConsumerId,
@@ -189,7 +226,8 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
{ok, {dequeue, empty}, Leader} ->
{ok, empty, State0#state{leader = Leader}};
{ok, {dequeue, Msg, NumReady}, Leader} ->
- {ok, {Msg, NumReady}, State0#state{leader = Leader}};
+ {ok, {Msg, NumReady},
+ State0#state{leader = Leader}};
{ok, {error, _} = Err, _Leader} ->
Err;
Err ->
@@ -208,7 +246,7 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
-spec settle(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
{ok, state()}.
settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
- Node = pick_node(State0),
+ Node = pick_server(State0),
Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds),
case send_command(Node, undefined, Cmd, normal, State0) of
{slow, S} ->
@@ -241,7 +279,7 @@ settle(ConsumerTag, [_|_] = MsgIds,
-spec return(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
{ok, state()}.
return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
- Node = pick_node(State0),
+ Node = pick_server(State0),
% TODO: make rabbit_fifo return support lists of message ids
Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds),
case send_command(Node, undefined, Cmd, normal, State0) of
@@ -275,7 +313,7 @@ return(ConsumerTag, [_|_] = MsgIds,
-spec discard(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
{ok | slow, state()}.
discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
- Node = pick_node(State0),
+ Node = pick_server(State0),
Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds),
case send_command(Node, undefined, Cmd, normal, State0) of
{slow, S} ->
@@ -363,7 +401,7 @@ credit(ConsumerTag, Credit, Drain,
%% the last received msgid provides us with the delivery count if we
%% add one as it is 0 indexed
C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}),
- Node = pick_node(State0),
+ Node = pick_server(State0),
Cmd = rabbit_fifo:make_credit(ConsumerId, Credit,
C#consumer.last_msg_id + 1, Drain),
case send_command(Node, undefined, Cmd, normal, State0) of
@@ -429,11 +467,11 @@ stat(Leader, Timeout) ->
%% @doc returns the cluster name
-spec cluster_name(state()) -> cluster_name().
-cluster_name(#state{cluster_name = ClusterName}) ->
+cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) ->
ClusterName.
-update_machine_state(Node, Conf) ->
- case ra:process_command(Node, rabbit_fifo:make_update_config(Conf)) of
+update_machine_state(Server, Conf) ->
+ case ra:process_command(Server, rabbit_fifo:make_update_config(Conf)) of
{ok, ok, _} ->
ok;
Err ->
@@ -487,10 +525,11 @@ update_machine_state(Node, Conf) ->
{internal, Correlators :: [term()], actions(), state()} |
{rabbit_fifo:client_msg(), state()} | eol.
handle_ra_event(From, {applied, Seqs},
- #state{soft_limit = SftLmt,
- unblock_handler = UnblockFun} = State0) ->
+ #state{cfg = #cfg{soft_limit = SftLmt,
+ unblock_handler = UnblockFun}} = State00) ->
+ State0 = State00#state{leader = From},
{Corrs, Actions, State1} = lists:foldl(fun seq_applied/2,
- {[], [], State0#state{leader = From}},
+ {[], [], State0},
Seqs),
case maps:size(State1#state.pending) < SftLmt of
true when State1#state.slow == true ->
@@ -511,7 +550,7 @@ handle_ra_event(From, {applied, Seqs},
add_command(Cid, discard,
Discards, Acc)))
end, [], State1#state.unsent_commands),
- Node = pick_node(State2),
+ Node = pick_server(State2),
%% send all the settlements and returns
State = lists:foldl(fun (C, S0) ->
case send_command(Node, undefined,
@@ -527,6 +566,10 @@ handle_ra_event(From, {applied, Seqs},
end;
handle_ra_event(From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
handle_delivery(From, Del, State0);
+handle_ra_event(_, {machine, {queue_status, Status}},
+ #state{} = State) ->
+ %% just set the queue status
+ {internal, [], [], State#state{queue_status = Status}};
handle_ra_event(Leader, {machine, leader_change},
#state{leader = Leader} = State) ->
%% leader already known
@@ -543,7 +586,7 @@ handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) ->
State1 = State0#state{leader = Leader},
State = resend(Seq, State1),
{internal, [], [], State};
-handle_ra_event(_, timeout, #state{servers = Servers} = State0) ->
+handle_ra_event(_, timeout, #state{cfg = #cfg{servers = Servers}} = State0) ->
case find_leader(Servers) of
undefined ->
%% still no leader, set the timer again
@@ -642,7 +685,6 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) ->
error ->
State
end.
-
resend_all_pending(#state{pending = Pend} = State) ->
Seqs = lists:sort(maps:keys(Pend)),
lists:foldl(fun resend/2, State, Seqs).
@@ -719,16 +761,19 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) ->
[Leader])
end.
-pick_node(#state{leader = undefined, servers = [N | _]}) ->
+pick_server(#state{leader = undefined,
+ cfg = #cfg{servers = [N | _]}}) ->
%% TODO: pick random rather that first?
N;
-pick_node(#state{leader = Leader}) ->
+pick_server(#state{leader = Leader}) ->
Leader.
% servers sorted by last known leader
-sorted_servers(#state{leader = undefined, servers = Servers}) ->
+sorted_servers(#state{leader = undefined,
+ cfg = #cfg{servers = Servers}}) ->
Servers;
-sorted_servers(#state{leader = Leader, servers = Servers}) ->
+sorted_servers(#state{leader = Leader,
+ cfg = #cfg{servers = Servers}}) ->
[Leader | lists:delete(Leader, Servers)].
next_seq(#state{next_seq = Seq} = State) ->
@@ -742,7 +787,7 @@ consumer_id(ConsumerTag) ->
send_command(Server, Correlation, Command, Priority,
#state{pending = Pending,
- soft_limit = SftLmt} = State0) ->
+ cfg = #cfg{soft_limit = SftLmt}} = State0) ->
{Seq, State} = next_seq(State0),
ok = ra:pipeline_command(Server, Command, Seq, Priority),
Tag = case maps:size(Pending) >= SftLmt of
@@ -767,7 +812,7 @@ add_command(Cid, return, MsgIds, Acc) ->
add_command(Cid, discard, MsgIds, Acc) ->
[rabbit_fifo:make_discard(Cid, MsgIds) | Acc].
-set_timer(#state{servers = [Server | _]} = State) ->
+set_timer(#state{cfg = #cfg{servers = [Server | _]}} = State) ->
Ref = erlang:send_after(?TIMER_TIME, self(),
{ra_event, Server, timeout}),
State#state{timer_state = Ref}.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 915cf9d527..22333f6bdf 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -125,6 +125,12 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
|| ServerId <- members(NewQ)],
case ra:start_cluster(RaConfs) of
{ok, _, _} ->
+ %% TODO: handle error - what should be done if the
+ %% config cannot be updated
+ ok = rabbit_fifo_client:update_machine_state(Id,
+ ra_machine_config(NewQ)),
+ %% force a policy change to ensure the latest config is
+ %% updated even when running the machine version from 0
rabbit_event:notify(queue_created,
[{name, QName},
{durable, Durable},
@@ -152,6 +158,12 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
{Name, _} = amqqueue:get_pid(Q),
%% take the minimum value of the policy and the queue arg if present
MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
+ Overflow = args_policy_lookup(<<"overflow">>,
+ fun (A, B) ->
+ rabbit_log:info("RESOLVE ~p", [{A, B}]),
+ A
+ end , Q),
+ rabbit_log:info("OVERFLOW ~p ARGS ~p", [Overflow, amqqueue:get_arguments(Q)]),
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q),
MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q),
@@ -165,7 +177,8 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
max_in_memory_length => MaxMemoryLength,
max_in_memory_bytes => MaxMemoryBytes,
single_active_consumer_on => single_active_consumer_on(Q),
- delivery_limit => DeliveryLimit
+ delivery_limit => DeliveryLimit,
+ overflow_strategy => overflow(Overflow, drop_head)
}.
single_active_consumer_on(Q) ->
@@ -292,8 +305,13 @@ filter_quorum_critical(Queues, ReplicaStates) ->
-spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean().
is_policy_applicable(_Q, Policy) ->
- Applicable = [<<"max-length">>, <<"max-length-bytes">>, <<"max-in-memory-length">>,
- <<"max-in-memory-bytes">>, <<"delivery-limit">>, <<"dead-letter-exchange">>,
+ Applicable = [<<"max-length">>,
+ <<"max-length-bytes">>,
+ <<"x-overflow">>,
+ <<"max-in-memory-length">>,
+ <<"max-in-memory-bytes">>,
+ <<"delivery-limit">>,
+ <<"dead-letter-exchange">>,
<<"dead-letter-routing-key">>],
lists:all(fun({P, _}) ->
lists:member(P, Applicable)
@@ -694,13 +712,29 @@ stateless_deliver(ServerId, Delivery) ->
-spec deliver(Confirm :: boolean(), rabbit_types:delivery(),
rabbit_fifo_client:state()) ->
- {ok | slow, rabbit_fifo_client:state()}.
+ {ok | slow, rabbit_fifo_client:state()} |
+ {reject_publish, rabbit_fifo_client:state()}.
deliver(false, Delivery, QState0) ->
- rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0);
+ case rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0) of
+ {ok, _} = Res -> Res;
+ {slow, _} = Res -> Res;
+ {reject_publish, State} ->
+ {ok, State}
+ end;
deliver(true, Delivery, QState0) ->
- rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no,
- Delivery#delivery.message, QState0).
+ Seq = Delivery#delivery.msg_seq_no,
+ case rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no,
+ Delivery#delivery.message, QState0) of
+ {ok, _} = Res -> Res;
+ {slow, _} = Res -> Res;
+ {reject_publish, State} ->
+ %% TODO: this works fine but once the queue types interface is in
+ %% place it could be replaced with an action or similar to avoid
+ %% self publishing messages.
+ gen_server2:cast(self(), {reject_publish, Seq, undefined}),
+ {ok, State}
+ end.
-spec info(amqqueue:amqqueue()) -> rabbit_types:infos().
@@ -783,9 +817,9 @@ maybe_delete_data_dir(UId) ->
ok
end.
-policy_changed(QName, Node) ->
+policy_changed(QName, Server) ->
{ok, Q} = rabbit_amqqueue:lookup(QName),
- rabbit_fifo_client:update_machine_state(Node, ra_machine_config(Q)).
+ rabbit_fifo_client:update_machine_state(Server, ra_machine_config(Q)).
-spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'.
@@ -1321,7 +1355,7 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
check_invalid_arguments(QueueName, Args) ->
Keys = [<<"x-expires">>, <<"x-message-ttl">>,
- <<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>],
+ <<"x-max-priority">>, <<"x-queue-mode">>],
[case rabbit_misc:table_lookup(Args, Key) of
undefined -> ok;
_TypeVal -> rabbit_misc:protocol_error(
@@ -1413,3 +1447,7 @@ get_nodes(Q) when ?is_amqqueue(Q) ->
update_type_state(Q, Fun) when ?is_amqqueue(Q) ->
Ts = amqqueue:get_type_state(Q),
amqqueue:set_type_state(Q, Fun(Ts)).
+
+overflow(undefined, Def) -> Def;
+overflow(<<"reject-publish">>, _Def) -> reject_publish;
+overflow(<<"drop-head">>, _Def) -> drop_head.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index be3b46fbfb..ccb1e1f4d8 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -113,6 +113,7 @@ all_tests() ->
subscribe_redelivery_count,
message_bytes_metrics,
queue_length_limit_drop_head,
+ queue_length_limit_reject_publish,
subscribe_redelivery_limit,
subscribe_redelivery_policy,
subscribe_redelivery_limit_with_dead_letter,
@@ -612,14 +613,16 @@ publish_confirm(Ch, QName) ->
publish(Ch, QName),
amqp_channel:register_confirm_handler(Ch, self()),
ct:pal("waiting for confirms from ~s", [QName]),
- ok = receive
- #'basic.ack'{} -> ok;
- #'basic.nack'{} -> fail
- after 2500 ->
- exit(confirm_timeout)
- end,
- ct:pal("CONFIRMED! ~s", [QName]),
- ok.
+ receive
+ #'basic.ack'{} ->
+ ct:pal("CONFIRMED! ~s", [QName]),
+ ok;
+ #'basic.nack'{} ->
+ ct:pal("NOT CONFIRMED! ~s", [QName]),
+ fail
+ after 2500 ->
+ exit(confirm_timeout)
+ end.
publish_and_restart(Config) ->
%% Test the node restart with both types of queues (quorum and classic) to
@@ -1244,13 +1247,13 @@ simple_confirm_availability_on_leader_change(Config) ->
%% open a channel to another node
Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
- publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
%% stop the node hosting the leader
ok = rabbit_ct_broker_helpers:stop_node(Config, Node2),
%% this should not fail as the channel should detect the new leader and
%% resend to that
- publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
ok.
@@ -1270,7 +1273,7 @@ confirm_availability_on_leader_change(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
ConfirmLoop = fun Loop() ->
- publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
receive {done, P} ->
P ! done,
ok
@@ -2020,6 +2023,35 @@ queue_length_limit_drop_head(Config) ->
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = true})).
+queue_length_limit_reject_publish(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ RaName = ra_name(QQ),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-length">>, long, 1},
+ {<<"x-overflow">>, longstr, <<"reject-publish">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ ok = publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
+ %% give the channel some time to process the async reject_publish notification
+ %% now that we are over the limit it should start failing
+ wait_for_messages_total(Servers, RaName, 2),
+ fail = publish_confirm(Ch, QQ),
+ %% remove all messages
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = _}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})),
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = _}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})),
+ %% publish should be allowed again now
+ ok = publish_confirm(Ch, QQ),
+ ok.
+
queue_length_in_memory_limit_basic_get(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index e3dfb29e7d..ceab563865 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -59,19 +59,24 @@ end_per_testcase(_TestCase, _Config) ->
?ASSERT_EFF(EfxPat, true, Effects)).
-define(ASSERT_EFF(EfxPat, Guard, Effects),
- ?assert(lists:any(fun (EfxPat) when Guard -> true;
- (_) -> false
- end, Effects))).
+ ?assert(lists:any(fun (EfxPat) when Guard -> true;
+ (_) -> false
+ end, Effects))).
-define(ASSERT_NO_EFF(EfxPat, Effects),
- ?assert(not lists:any(fun (EfxPat) -> true;
- (_) -> false
- end, Effects))).
+ ?assert(not lists:any(fun (EfxPat) -> true;
+ (_) -> false
+ end, Effects))).
+
+-define(ASSERT_NO_EFF(EfxPat, Guard, Effects),
+ ?assert(not lists:any(fun (EfxPat) when Guard -> true;
+ (_) -> false
+ end, Effects))).
-define(assertNoEffect(EfxPat, Effects),
- ?assert(not lists:any(fun (EfxPat) -> true;
- (_) -> false
- end, Effects))).
+ ?assert(not lists:any(fun (EfxPat) -> true;
+ (_) -> false
+ end, Effects))).
test_init(Name) ->
init(#{name => Name,
@@ -1258,6 +1263,99 @@ single_active_with_credited_test(_) ->
State3#rabbit_fifo.waiting_consumers),
ok.
+
+register_enqueuer_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ max_length => 2,
+ overflow_strategy => reject_publish}),
+ %% simply registering should be ok when we're below limit
+ Pid1 = test_util:fake_pid(node()),
+ {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0),
+
+ {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1),
+ %% register another enqueuer shoudl be ok
+ Pid2 = test_util:fake_pid(node()),
+ {State3, ok, [_]} = apply(meta(3), make_register_enqueuer(Pid2), State2),
+
+ {State4, ok, _} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 2, two), State3),
+ {State5, ok, Efx} = apply(meta(5), rabbit_fifo:make_enqueue(Pid1, 3, three), State4),
+ % ct:pal("Efx ~p", [Efx]),
+ %% validate all registered enqueuers are notified of overflow state
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx),
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid2, Efx),
+
+ %% this time, registry should return reject_publish
+ {State6, reject_publish, [_]} = apply(meta(6), make_register_enqueuer(
+ test_util:fake_pid(node())), State5),
+ ?assertMatch(#{num_enqueuers := 3}, rabbit_fifo:overview(State6)),
+
+
+ %% remove two messages this should make the queue fall below the 0.8 limit
+ {State7, {dequeue, _, _}, _Efx7} =
+ apply(meta(7),
+ rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State6),
+ ct:pal("Efx7 ~p", [_Efx7]),
+ {State8, {dequeue, _, _}, Efx8} =
+ apply(meta(8),
+ rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State7),
+ ct:pal("Efx8 ~p", [Efx8]),
+ %% validate all registered enqueuers are notified of overflow state
+ ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid1, Efx8),
+ ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid2, Efx8),
+ {_State9, {dequeue, _, _}, Efx9} =
+ apply(meta(9),
+ rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State8),
+ ?ASSERT_NO_EFF({send_msg, P, go, [ra_event]}, P == Pid1, Efx9),
+ ?ASSERT_NO_EFF({send_msg, P, go, [ra_event]}, P == Pid2, Efx9),
+ ok.
+
+reject_publish_purge_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ max_length => 2,
+ overflow_strategy => reject_publish}),
+ %% simply registering should be ok when we're below limit
+ Pid1 = test_util:fake_pid(node()),
+ {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0),
+ {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1),
+ {State3, ok, _} = apply(meta(3), rabbit_fifo:make_enqueue(Pid1, 2, two), State2),
+ {State4, ok, Efx} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 3, three), State3),
+ % ct:pal("Efx ~p", [Efx]),
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx),
+ {_State5, {purge, 3}, Efx1} = apply(meta(5), rabbit_fifo:make_purge(), State4),
+ ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid1, Efx1),
+ ok.
+
+reject_publish_applied_after_limit_test(_) ->
+ InitConf = #{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8))
+ },
+ State0 = init(InitConf),
+ %% simply registering should be ok when we're below limit
+ Pid1 = test_util:fake_pid(node()),
+ {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0),
+ {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1),
+ {State3, ok, _} = apply(meta(3), rabbit_fifo:make_enqueue(Pid1, 2, two), State2),
+ {State4, ok, Efx} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 3, three), State3),
+ % ct:pal("Efx ~p", [Efx]),
+ ?ASSERT_NO_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx),
+ %% apply new config
+ Conf = #{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ max_length => 2,
+ overflow_strategy => reject_publish
+ },
+ {State5, ok, Efx1} = apply(meta(5), rabbit_fifo:make_update_config(Conf), State4),
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx1),
+ Pid2 = test_util:fake_pid(node()),
+ {_State6, reject_publish, _} = apply(meta(1), make_register_enqueuer(Pid2), State5),
+ ok.
+
purge_nodes_test(_) ->
Node = purged@node,
ThisNode = node(),
@@ -1412,6 +1510,7 @@ machine_version_test(_) ->
%% Utility
init(Conf) -> rabbit_fifo:init(Conf).
+make_register_enqueuer(Pid) -> rabbit_fifo:make_register_enqueuer(Pid).
apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State).
init_aux(Conf) -> rabbit_fifo:init_aux(Conf).
handle_aux(S, T, C, A, L, M) -> rabbit_fifo:handle_aux(S, T, C, A, L, M).
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 23522e71f9..dd2c7154d0 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -479,15 +479,17 @@ test_run_log(_Config) ->
snapshots(_Config) ->
run_proper(
fun () ->
- ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength,
- InMemoryBytes},
- frequency([{10, {0, 0, false, 0, 0, 0}},
+ ?FORALL({Length, Bytes, SingleActiveConsumer,
+ DeliveryLimit, InMemoryLength, InMemoryBytes,
+ Overflow},
+ frequency([{10, {0, 0, false, 0, 0, 0, drop_head}},
{5, {oneof([range(1, 10), undefined]),
oneof([range(1, 1000), undefined]),
boolean(),
oneof([range(1, 3), undefined]),
oneof([range(1, 10), undefined]),
- oneof([range(1, 1000), undefined])
+ oneof([range(1, 1000), undefined]),
+ oneof([drop_head, reject_publish])
}}]),
begin
Config = config(?FUNCTION_NAME,
@@ -496,7 +498,8 @@ snapshots(_Config) ->
SingleActiveConsumer,
DeliveryLimit,
InMemoryLength,
- InMemoryBytes),
+ InMemoryBytes,
+ Overflow),
?FORALL(O, ?LET(Ops, log_gen(256), expand(Ops, Config)),
collect({log_size, length(O)},
snapshots_prop(Config, O)))
@@ -681,6 +684,11 @@ max_length(_Config) ->
config(Name, Length, Bytes, SingleActive, DeliveryLimit,
InMemoryLength, InMemoryBytes) ->
+config(Name, Length, Bytes, SingleActive, DeliveryLimit,
+ InMemoryLength, InMemoryBytes, drop_head).
+
+config(Name, Length, Bytes, SingleActive, DeliveryLimit,
+ InMemoryLength, InMemoryBytes, Overflow) ->
#{name => Name,
max_length => map_max(Length),
max_bytes => map_max(Bytes),
@@ -688,7 +696,8 @@ config(Name, Length, Bytes, SingleActive, DeliveryLimit,
single_active_consumer_on => SingleActive,
delivery_limit => map_max(DeliveryLimit),
max_in_memory_length => map_max(InMemoryLength),
- max_in_memory_bytes => map_max(InMemoryBytes)}.
+ max_in_memory_bytes => map_max(InMemoryBytes),
+ overflow_strategy => Overflow}.
map_max(0) -> undefined;
map_max(N) -> N.