summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_fifo.erl209
-rw-r--r--src/rabbit_fifo.hrl12
-rw-r--r--src/rabbit_fifo_client.erl133
-rw-r--r--src/rabbit_quorum_queue.erl58
-rw-r--r--test/quorum_queue_SUITE.erl54
-rw-r--r--test/rabbit_fifo_SUITE.erl117
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl21
7 files changed, 461 insertions, 143 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 9ce39f5a64..9a2e3f3dc3 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -50,6 +50,7 @@
%% protocol helpers
make_enqueue/3,
+ make_register_enqueuer/1,
make_checkout/3,
make_settle/2,
make_return/2,
@@ -64,6 +65,7 @@
-record(enqueue, {pid :: option(pid()),
seq :: option(msg_seqno()),
msg :: raw_msg()}).
+-record(register_enqueuer, {pid :: pid()}).
-record(checkout, {consumer_id :: consumer_id(),
spec :: checkout_spec(),
meta :: consumer_meta()}).
@@ -83,6 +85,7 @@
-opaque protocol() ::
#enqueue{} |
+ #register_enqueuer{} |
#checkout{} |
#settle{} |
#return{} |
@@ -164,9 +167,27 @@ zero(_) ->
-spec apply(ra_machine:command_meta_data(), command(), state()) ->
{state(), Reply :: term(), ra_machine:effects()} |
{state(), Reply :: term()}.
-apply(Metadata, #enqueue{pid = From, seq = Seq,
- msg = RawMsg}, State00) ->
- apply_enqueue(Metadata, From, Seq, RawMsg, State00);
+apply(Meta, #enqueue{pid = From, seq = Seq,
+ msg = RawMsg}, State00) ->
+ apply_enqueue(Meta, From, Seq, RawMsg, State00);
+apply(_Meta, #register_enqueuer{pid = Pid},
+ #?MODULE{enqueuers = Enqueuers0,
+ cfg = #cfg{overflow_strategy = Overflow}} = State0) ->
+
+ State = case maps:is_key(Pid, Enqueuers0) of
+ true ->
+ %% if the enqueuer exits just echo the overflow state
+ State0;
+ false ->
+ State0#?MODULE{enqueuers = Enqueuers0#{Pid => #enqueuer{}}}
+ end,
+ Res = case is_over_limit(State) of
+ true when Overflow == reject_publish ->
+ reject_publish;
+ _ ->
+ ok
+ end,
+ {State, Res, [{monitor, process, Pid}]};
apply(Meta,
#settle{msg_ids = MsgIds, consumer_id = ConsumerId},
#?MODULE{consumers = Cons0} = State) ->
@@ -215,8 +236,9 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
ServiceQueue0),
Cons = maps:put(ConsumerId, Con1, Cons0),
{State1, ok, Effects} =
- checkout(Meta, State0#?MODULE{service_queue = ServiceQueue,
- consumers = Cons}, []),
+ checkout(Meta, State0,
+ State0#?MODULE{service_queue = ServiceQueue,
+ consumers = Cons}, []),
Response = {send_credit_reply, messages_ready(State1)},
%% by this point all checkouts for the updated credit value
%% should be processed so we can evaluate the drain
@@ -262,7 +284,8 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
apply(_, #checkout{spec = {dequeue, _}},
#?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) ->
{State0, {error, unsupported}};
-apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement},
+apply(#{index := Index,
+ from := From} = Meta, #checkout{spec = {dequeue, Settlement},
meta = ConsumerMeta,
consumer_id = ConsumerId},
#?MODULE{consumers = Consumers} = State0) ->
@@ -278,57 +301,69 @@ apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement},
{once, 1, simple_prefetch},
State0),
{success, _, MsgId, Msg, State2} = checkout_one(State1),
- {State, Effects} = case Settlement of
- unsettled ->
- {_, Pid} = ConsumerId,
- {State2, [{monitor, process, Pid}]};
- settled ->
- %% immediately settle the checkout
- {State3, _, Effects0} =
- apply(Meta, make_settle(ConsumerId, [MsgId]),
- State2),
- {State3, Effects0}
- end,
+ {State4, Effects1} = case Settlement of
+ unsettled ->
+ {_, Pid} = ConsumerId,
+ {State2, [{monitor, process, Pid}]};
+ settled ->
+ %% immediately settle the checkout
+ {State3, _, Effects0} =
+ apply(Meta, make_settle(ConsumerId, [MsgId]),
+ State2),
+ {State3, Effects0}
+ end,
+ {Reply, Effects2} =
case Msg of
{RaftIdx, {Header, 'empty'}} ->
%% TODO add here new log effect with reply
- {State, '$ra_no_reply',
- reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From)};
+ {'$ra_no_reply',
+ [reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From) |
+ Effects1]};
_ ->
- {State, {dequeue, {MsgId, Msg}, Ready-1}, Effects}
+ {{dequeue, {MsgId, Msg}, Ready-1}, Effects1}
+
+ end,
+
+ case evaluate_limit(Index, false, State0, State4, Effects2) of
+ {State, true, Effects} ->
+ update_smallest_raft_index(Index, Reply, State, Effects);
+ {State, false, Effects} ->
+ {State, Reply, Effects}
end
end;
apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
{State, Effects} = cancel_consumer(ConsumerId, State0, [], consumer_cancel),
- checkout(Meta, State, Effects);
+ checkout(Meta, State0, State, Effects);
apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
consumer_id = {_, Pid} = ConsumerId},
State0) ->
State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0),
- checkout(Meta, State1, [{monitor, process, Pid}]);
-apply(#{index := RaftIdx}, #purge{},
+ checkout(Meta, State0, State1, [{monitor, process, Pid}]);
+apply(#{index := Index}, #purge{},
#?MODULE{ra_indexes = Indexes0,
returns = Returns,
messages = Messages} = State0) ->
Total = messages_ready(State0),
Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
- [I || {_, {I, _}} <- lqueue:to_list(Messages)]),
+ [I || {_, {I, _}} <- lqueue:to_list(Messages)]),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1,
[I || {_, {I, _}} <- lqueue:to_list(Returns)]),
- {State, _, Effects} =
- update_smallest_raft_index(RaftIdx,
- State0#?MODULE{ra_indexes = Indexes,
- messages = lqueue:new(),
- returns = lqueue:new(),
- msg_bytes_enqueue = 0,
- prefix_msgs = {0, [], 0, []},
- msg_bytes_in_memory = 0,
- msgs_ready_in_memory = 0},
- []),
- %% as we're not checking out after a purge (no point) we have to
- %% reverse the effects ourselves
- {State, {purge, Total},
- lists:reverse([garbage_collection | Effects])};
+
+ State1 = State0#?MODULE{ra_indexes = Indexes,
+ messages = lqueue:new(),
+ returns = lqueue:new(),
+ msg_bytes_enqueue = 0,
+ prefix_msgs = {0, [], 0, []},
+ msg_bytes_in_memory = 0,
+ msgs_ready_in_memory = 0},
+ Effects0 = [garbage_collection],
+ Reply = {purge, Total},
+ case evaluate_limit(Index, false, State0, State1, Effects0) of
+ {State, true, Effects} ->
+ update_smallest_raft_index(Index, Reply, State, Effects);
+ {State, false, Effects} ->
+ {State, Reply, Effects}
+ end;
apply(Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
@@ -375,7 +410,7 @@ apply(Meta, {down, Pid, noconnection},
(_, E) -> E
end, Enqs0),
Effects = [{monitor, node, Node} | Effects1],
- checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects);
+ checkout(Meta, State0, State#?MODULE{enqueuers = Enqs}, Effects);
apply(Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
@@ -418,10 +453,10 @@ apply(Meta, {down, Pid, noconnection},
_ ->
[{monitor, node, Node}]
end ++ Effects1,
- checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects);
+ checkout(Meta, State0, State#?MODULE{enqueuers = Enqs}, Effects);
apply(Meta, {down, Pid, _Info}, State0) ->
{State, Effects} = handle_down(Pid, State0),
- checkout(Meta, State, Effects);
+ checkout(Meta, State0, State, Effects);
apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
enqueuers = Enqs0,
service_queue = SQ0} = State0) ->
@@ -456,7 +491,7 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
service_queue = SQ,
waiting_consumers = Waiting},
{State, Effects} = activate_next_consumer(State1, Effects1),
- checkout(Meta, State, Effects);
+ checkout(Meta, State0, State, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
apply(_, #purge_nodes{nodes = Nodes}, State0) ->
@@ -465,7 +500,7 @@ apply(_, #purge_nodes{nodes = Nodes}, State0) ->
end, {State0, []}, Nodes),
{State, ok, Effects};
apply(Meta, #update_config{config = Conf}, State) ->
- checkout(Meta, update_config(Conf, State), []);
+ checkout(Meta, State, update_config(Conf, State), []);
apply(_Meta, {machine_version, 0, 1}, V0State) ->
State = convert_v0_to_v1(V0State),
{State, ok, []}.
@@ -1011,7 +1046,7 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
{ok, State1, Effects1} ->
State2 = append_to_master_index(RaftIdx, State1),
- {State, ok, Effects} = checkout(Meta, State2, Effects1),
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
{maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
{duplicate, State, Effects} ->
{State, ok, Effects}
@@ -1173,7 +1208,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
_ ->
{State1, Effects1}
end,
- {State, ok, Effects} = checkout(Meta, State2, Effects3),
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects3),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
% used to processes messages that are finished
@@ -1221,7 +1256,7 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
Discarded = maps:with(MsgIds, Checked0),
{State2, Effects1} = complete(ConsumerId, Discarded, Con0,
Effects0, State0),
- {State, ok, Effects} = checkout(Meta, State2, Effects1),
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
dead_letter_effects(_Reason, _Discarded,
@@ -1257,7 +1292,10 @@ cancel_consumer_effects(ConsumerId,
[{mod_call, rabbit_quorum_queue,
cancel_consumer_handler, [QName, ConsumerId]} | Effects].
-update_smallest_raft_index(IncomingRaftIdx,
+update_smallest_raft_index(Idx, State, Effects) ->
+ update_smallest_raft_index(Idx, ok, State, Effects).
+
+update_smallest_raft_index(IncomingRaftIdx, Reply,
#?MODULE{ra_indexes = Indexes,
release_cursors = Cursors0} = State0,
Effects) ->
@@ -1267,17 +1305,16 @@ update_smallest_raft_index(IncomingRaftIdx,
% we can forward release_cursor all the way until
% the last received command, hooray
State = State0#?MODULE{release_cursors = lqueue:new()},
- {State, ok, Effects ++ [{release_cursor, IncomingRaftIdx, State}]};
+ {State, Reply, Effects ++ [{release_cursor, IncomingRaftIdx, State}]};
_ ->
Smallest = rabbit_fifo_index:smallest(Indexes),
case find_next_cursor(Smallest, Cursors0) of
{empty, Cursors} ->
- {State0#?MODULE{release_cursors = Cursors},
- ok, Effects};
+ {State0#?MODULE{release_cursors = Cursors}, Reply, Effects};
{Cursor, Cursors} ->
%% we can emit a release cursor when we've passed the smallest
%% release cursor available.
- {State0#?MODULE{release_cursors = Cursors}, ok,
+ {State0#?MODULE{release_cursors = Cursors}, Reply,
Effects ++ [Cursor]}
end
end.
@@ -1382,10 +1419,10 @@ return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
end, {State, Effects0}, Checked).
%% checkout new messages to consumers
-checkout(#{index := Index}, State0, Effects0) ->
+checkout(#{index := Index}, OldState, State0, Effects0) ->
{State1, _Result, Effects1} = checkout0(checkout_one(State0),
Effects0, {#{}, #{}}),
- case evaluate_limit(false, State1, Effects1) of
+ case evaluate_limit(Index, false, OldState, State1, Effects1) of
{State, true, Effects} ->
update_smallest_raft_index(Index, State, Effects);
{State, false, Effects} ->
@@ -1418,16 +1455,54 @@ checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
end,
{State0, ok, lists:reverse(Effects1)}.
-evaluate_limit(Result,
+evaluate_limit(_Index, Result, _BeforeState,
#?MODULE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}} = State,
Effects) ->
{State, Result, Effects};
-evaluate_limit(Result, State0, Effects0) ->
+evaluate_limit(Index, Result, BeforeState,
+ #?MODULE{cfg = #cfg{overflow_strategy = Strategy},
+ enqueuers = Enqs0} = State0,
+ Effects0) ->
case is_over_limit(State0) of
- true ->
+ true when Strategy == drop_head ->
{State, Effects} = drop_head(State0, Effects0),
- evaluate_limit(true, State, Effects);
+ evaluate_limit(Index, true, BeforeState, State, Effects);
+ true when Strategy == reject_publish ->
+ %% generate send_msg effect for each enqueuer to let them know
+ %% they need to block
+ {Enqs, Effects} =
+ maps:fold(
+ fun (P, #enqueuer{blocked = undefined} = E0, {Enqs, Acc}) ->
+ E = E0#enqueuer{blocked = Index},
+ {Enqs#{P => E},
+ [{send_msg, P, {queue_status, reject_publish},
+ [ra_event]} | Acc]};
+ (_P, _E, Acc) ->
+ Acc
+ end, {Enqs0, Effects0}, Enqs0),
+ {State0#?MODULE{enqueuers = Enqs}, Result, Effects};
+ false when Strategy == reject_publish ->
+ %% TODO: optimise as this case gets called for every command
+ %% pretty much
+ Before = is_below_soft_limit(BeforeState),
+ case {Before, is_below_soft_limit(State0)} of
+ {false, true} ->
+ %% we have moved below the lower limit which
+ {Enqs, Effects} =
+ maps:fold(
+ fun (P, #enqueuer{} = E0, {Enqs, Acc}) ->
+ E = E0#enqueuer{blocked = undefined},
+ {Enqs#{P => E},
+ [{send_msg, P, {queue_status, go}, [ra_event]}
+ | Acc]};
+ (_P, _E, Acc) ->
+ Acc
+ end, {Enqs0, Effects0}, Enqs0),
+ {State0#?MODULE{enqueuers = Enqs}, Result, Effects};
+ _ ->
+ {State0, Result, Effects0}
+ end;
false ->
{State0, Result, Effects0}
end.
@@ -1753,12 +1828,30 @@ is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined,
is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
max_bytes = MaxBytes},
msg_bytes_enqueue = BytesEnq} = State) ->
-
messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
+is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = undefined,
+ max_bytes = undefined}}) ->
+ false;
+is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
+ max_bytes = MaxBytes},
+ msg_bytes_enqueue = BytesEnq} = State) ->
+ is_below(MaxLength, messages_ready(State)) andalso
+ is_below(MaxBytes, BytesEnq).
+
+is_below(undefined, _Num) ->
+ true;
+is_below(Val, Num) when is_integer(Val) andalso is_integer(Num) ->
+ Num =< trunc(Val * ?LOW_LIMIT).
+
-spec make_enqueue(option(pid()), option(msg_seqno()), raw_msg()) -> protocol().
make_enqueue(Pid, Seq, Msg) ->
#enqueue{pid = Pid, seq = Seq, msg = Msg}.
+
+-spec make_register_enqueuer(pid()) -> protocol().
+make_register_enqueuer(Pid) ->
+ #register_enqueuer{pid = Pid}.
+
-spec make_checkout(consumer_id(),
checkout_spec(), consumer_meta()) -> protocol().
make_checkout(ConsumerId, Spec, Meta) ->
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index e337540c35..dc01b6ec1c 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -75,6 +75,7 @@
-define(GC_MEM_LIMIT_B, 2000000).
-define(MB, 1048576).
+-define(LOW_LIMIT, 0.8).
-record(consumer,
{meta = #{} :: consumer_meta(),
@@ -105,11 +106,11 @@
% out of order enqueues - sorted list
pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}],
status = up :: up |
- suspected_down |
- %% it is useful to have a record of when this was blocked
- %% so that we can retry sending the block effect if
- %% the publisher did not receive the initial one
- {blocked, At :: ra:index()}
+ suspected_down,
+ %% it is useful to have a record of when this was blocked
+ %% so that we can retry sending the block effect if
+ %% the publisher did not receive the initial one
+ blocked :: undefined | ra:index()
}).
-record(cfg,
@@ -191,5 +192,6 @@
max_bytes => non_neg_integer(),
max_in_memory_length => non_neg_integer(),
max_in_memory_bytes => non_neg_integer(),
+ overflow_strategy => drop_head | reject_publish,
single_active_consumer_on => boolean(),
delivery_limit => non_neg_integer()}.
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 2ac9f6787a..a698f93e9e 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -53,9 +53,17 @@
-record(consumer, {last_msg_id :: seq() | -1,
delivery_count = 0 :: non_neg_integer()}).
--record(state, {cluster_name :: cluster_name(),
- servers = [] :: [ra:server_id()],
+-record(cfg, {cluster_name :: cluster_name(),
+ servers = [] :: [ra:server_id()],
+ soft_limit = ?SOFT_LIMIT :: non_neg_integer(),
+ block_handler = fun() -> ok end :: fun(() -> term()),
+ unblock_handler = fun() -> ok end :: fun(() -> ok),
+ timeout :: non_neg_integer(),
+ version = 0 :: non_neg_integer()}).
+
+-record(state, {cfg :: #cfg{},
leader :: undefined | ra:server_id(),
+ queue_status :: undefined | go | reject_publish,
next_seq = 0 :: seq(),
%% Last applied is initialise to -1 to note that no command has yet been
%% applied, but allowing to resend messages if the first ones on the sequence
@@ -66,15 +74,11 @@
slow = false :: boolean(),
unsent_commands = #{} :: #{rabbit_fifo:consumer_id() =>
{[seq()], [seq()], [seq()]}},
- soft_limit = ?SOFT_LIMIT :: non_neg_integer(),
pending = #{} :: #{seq() =>
{term(), rabbit_fifo:command()}},
consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() =>
#consumer{}},
- block_handler = fun() -> ok end :: fun(() -> term()),
- unblock_handler = fun() -> ok end :: fun(() -> ok),
- timer_state :: term(),
- timeout :: non_neg_integer()
+ timer_state :: term()
}).
-opaque state() :: #state{}.
@@ -103,21 +107,22 @@ init(ClusterName, Servers) ->
-spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit) ->
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
- #state{cluster_name = ClusterName,
- servers = Servers,
- soft_limit = SoftLimit,
- timeout = Timeout}.
+ #state{cfg = #cfg{cluster_name = ClusterName,
+ servers = Servers,
+ soft_limit = SoftLimit,
+ timeout = Timeout}}.
-spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok),
fun(() -> ok)) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
- #state{cluster_name = ClusterName,
- servers = Servers,
- block_handler = BlockFun,
- unblock_handler = UnblockFun,
- soft_limit = SoftLimit,
- timeout = Timeout}.
+ #state{cfg = #cfg{cluster_name = ClusterName,
+ servers = Servers,
+ block_handler = BlockFun,
+ unblock_handler = UnblockFun,
+ soft_limit = SoftLimit,
+ timeout = Timeout}}.
+
%% @doc Enqueues a message.
%% @param Correlation an arbitrary erlang term used to correlate this
@@ -132,10 +137,41 @@ init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
%% SequenceNumber can be correlated to the applied sequence numbers returned
%% by the {@link handle_ra_event/2. handle_ra_event/2} function.
-spec enqueue(Correlation :: term(), Msg :: term(), State :: state()) ->
- {ok | slow, state()}.
-enqueue(Correlation, Msg, State0 = #state{slow = Slow,
- block_handler = BlockFun}) ->
- Node = pick_node(State0),
+ {ok | slow | reject_publish, state()}.
+enqueue(Correlation, Msg,
+ #state{queue_status = undefined,
+ next_enqueue_seq = 1,
+ cfg = #cfg{}} = State0) ->
+ %% it is the first enqueue, check the version
+ {_, Node} = Server = pick_server(State0),
+ State =
+ case rpc:call(Node, rabbit_fifo, version, []) of
+ 0 ->
+ %% the leader is running the old version
+ %% so we can't initialize the enqueuer session safely
+ State0#state{queue_status = go};
+ 1 ->
+ %% were running the new version on the leader do sync initialisation
+ %% of enqueuer session
+ Reg = rabbit_fifo:make_register_enqueuer(self()),
+ case ra:process_command(Server, Reg) of
+ {ok, reject_publish, _} ->
+ State0#state{queue_status = reject_publish};
+ {ok, ok, _} ->
+ State0#state{queue_status = go};
+ Err ->
+ exit(Err)
+ end
+ end,
+ enqueue(Correlation, Msg, State);
+enqueue(_Correlation, _Msg,
+ #state{queue_status = reject_publish,
+ cfg = #cfg{}} = State) ->
+ {reject_publish, State};
+enqueue(Correlation, Msg,
+ #state{slow = Slow,
+ cfg = #cfg{block_handler = BlockFun}} = State0) ->
+ Node = pick_server(State0),
{Next, State1} = next_enqueue_seq(State0),
% by default there is no correlation id
Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg),
@@ -159,7 +195,7 @@ enqueue(Correlation, Msg, State0 = #state{slow = Slow,
%% by the {@link handle_ra_event/2. handle_ra_event/2} function.
%%
-spec enqueue(Msg :: term(), State :: state()) ->
- {ok | slow, state()}.
+ {ok | slow | reject_publish, state()}.
enqueue(Msg, State) ->
enqueue(undefined, Msg, State).
@@ -178,8 +214,9 @@ enqueue(Msg, State) ->
Settlement :: settled | unsettled, state()) ->
{ok, {rabbit_fifo:delivery_msg(), non_neg_integer()}
| empty, state()} | {error | timeout, term()}.
-dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
- Node = pick_node(State0),
+dequeue(ConsumerTag, Settlement,
+ #state{cfg = #cfg{timeout = Timeout}} = State0) ->
+ Node = pick_server(State0),
ConsumerId = consumer_id(ConsumerTag),
case ra:process_command(Node,
rabbit_fifo:make_checkout(ConsumerId,
@@ -189,7 +226,8 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
{ok, {dequeue, empty}, Leader} ->
{ok, empty, State0#state{leader = Leader}};
{ok, {dequeue, Msg, NumReady}, Leader} ->
- {ok, {Msg, NumReady}, State0#state{leader = Leader}};
+ {ok, {Msg, NumReady},
+ State0#state{leader = Leader}};
{ok, {error, _} = Err, _Leader} ->
Err;
Err ->
@@ -208,7 +246,7 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
-spec settle(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
{ok, state()}.
settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
- Node = pick_node(State0),
+ Node = pick_server(State0),
Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds),
case send_command(Node, undefined, Cmd, normal, State0) of
{slow, S} ->
@@ -241,7 +279,7 @@ settle(ConsumerTag, [_|_] = MsgIds,
-spec return(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
{ok, state()}.
return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
- Node = pick_node(State0),
+ Node = pick_server(State0),
% TODO: make rabbit_fifo return support lists of message ids
Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds),
case send_command(Node, undefined, Cmd, normal, State0) of
@@ -275,7 +313,7 @@ return(ConsumerTag, [_|_] = MsgIds,
-spec discard(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
{ok | slow, state()}.
discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
- Node = pick_node(State0),
+ Node = pick_server(State0),
Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds),
case send_command(Node, undefined, Cmd, normal, State0) of
{slow, S} ->
@@ -363,7 +401,7 @@ credit(ConsumerTag, Credit, Drain,
%% the last received msgid provides us with the delivery count if we
%% add one as it is 0 indexed
C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}),
- Node = pick_node(State0),
+ Node = pick_server(State0),
Cmd = rabbit_fifo:make_credit(ConsumerId, Credit,
C#consumer.last_msg_id + 1, Drain),
case send_command(Node, undefined, Cmd, normal, State0) of
@@ -429,11 +467,11 @@ stat(Leader, Timeout) ->
%% @doc returns the cluster name
-spec cluster_name(state()) -> cluster_name().
-cluster_name(#state{cluster_name = ClusterName}) ->
+cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) ->
ClusterName.
-update_machine_state(Node, Conf) ->
- case ra:process_command(Node, rabbit_fifo:make_update_config(Conf)) of
+update_machine_state(Server, Conf) ->
+ case ra:process_command(Server, rabbit_fifo:make_update_config(Conf)) of
{ok, ok, _} ->
ok;
Err ->
@@ -487,10 +525,11 @@ update_machine_state(Node, Conf) ->
{internal, Correlators :: [term()], actions(), state()} |
{rabbit_fifo:client_msg(), state()} | eol.
handle_ra_event(From, {applied, Seqs},
- #state{soft_limit = SftLmt,
- unblock_handler = UnblockFun} = State0) ->
+ #state{cfg = #cfg{soft_limit = SftLmt,
+ unblock_handler = UnblockFun}} = State00) ->
+ State0 = State00#state{leader = From},
{Corrs, Actions, State1} = lists:foldl(fun seq_applied/2,
- {[], [], State0#state{leader = From}},
+ {[], [], State0},
Seqs),
case maps:size(State1#state.pending) < SftLmt of
true when State1#state.slow == true ->
@@ -511,7 +550,7 @@ handle_ra_event(From, {applied, Seqs},
add_command(Cid, discard,
Discards, Acc)))
end, [], State1#state.unsent_commands),
- Node = pick_node(State2),
+ Node = pick_server(State2),
%% send all the settlements and returns
State = lists:foldl(fun (C, S0) ->
case send_command(Node, undefined,
@@ -527,6 +566,10 @@ handle_ra_event(From, {applied, Seqs},
end;
handle_ra_event(From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
handle_delivery(From, Del, State0);
+handle_ra_event(_, {machine, {queue_status, Status}},
+ #state{} = State) ->
+ %% just set the queue status
+ {internal, [], [], State#state{queue_status = Status}};
handle_ra_event(Leader, {machine, leader_change},
#state{leader = Leader} = State) ->
%% leader already known
@@ -543,7 +586,7 @@ handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) ->
State1 = State0#state{leader = Leader},
State = resend(Seq, State1),
{internal, [], [], State};
-handle_ra_event(_, timeout, #state{servers = Servers} = State0) ->
+handle_ra_event(_, timeout, #state{cfg = #cfg{servers = Servers}} = State0) ->
case find_leader(Servers) of
undefined ->
%% still no leader, set the timer again
@@ -642,7 +685,6 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) ->
error ->
State
end.
-
resend_all_pending(#state{pending = Pend} = State) ->
Seqs = lists:sort(maps:keys(Pend)),
lists:foldl(fun resend/2, State, Seqs).
@@ -719,16 +761,19 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) ->
[Leader])
end.
-pick_node(#state{leader = undefined, servers = [N | _]}) ->
+pick_server(#state{leader = undefined,
+ cfg = #cfg{servers = [N | _]}}) ->
%% TODO: pick random rather that first?
N;
-pick_node(#state{leader = Leader}) ->
+pick_server(#state{leader = Leader}) ->
Leader.
% servers sorted by last known leader
-sorted_servers(#state{leader = undefined, servers = Servers}) ->
+sorted_servers(#state{leader = undefined,
+ cfg = #cfg{servers = Servers}}) ->
Servers;
-sorted_servers(#state{leader = Leader, servers = Servers}) ->
+sorted_servers(#state{leader = Leader,
+ cfg = #cfg{servers = Servers}}) ->
[Leader | lists:delete(Leader, Servers)].
next_seq(#state{next_seq = Seq} = State) ->
@@ -742,7 +787,7 @@ consumer_id(ConsumerTag) ->
send_command(Server, Correlation, Command, Priority,
#state{pending = Pending,
- soft_limit = SftLmt} = State0) ->
+ cfg = #cfg{soft_limit = SftLmt}} = State0) ->
{Seq, State} = next_seq(State0),
ok = ra:pipeline_command(Server, Command, Seq, Priority),
Tag = case maps:size(Pending) >= SftLmt of
@@ -767,7 +812,7 @@ add_command(Cid, return, MsgIds, Acc) ->
add_command(Cid, discard, MsgIds, Acc) ->
[rabbit_fifo:make_discard(Cid, MsgIds) | Acc].
-set_timer(#state{servers = [Server | _]} = State) ->
+set_timer(#state{cfg = #cfg{servers = [Server | _]}} = State) ->
Ref = erlang:send_after(?TIMER_TIME, self(),
{ra_event, Server, timeout}),
State#state{timer_state = Ref}.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 915cf9d527..22333f6bdf 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -125,6 +125,12 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
|| ServerId <- members(NewQ)],
case ra:start_cluster(RaConfs) of
{ok, _, _} ->
+ %% TODO: handle error - what should be done if the
+ %% config cannot be updated
+ ok = rabbit_fifo_client:update_machine_state(Id,
+ ra_machine_config(NewQ)),
+ %% force a policy change to ensure the latest config is
+ %% updated even when running the machine version from 0
rabbit_event:notify(queue_created,
[{name, QName},
{durable, Durable},
@@ -152,6 +158,12 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
{Name, _} = amqqueue:get_pid(Q),
%% take the minimum value of the policy and the queue arg if present
MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
+ Overflow = args_policy_lookup(<<"overflow">>,
+ fun (A, B) ->
+ rabbit_log:info("RESOLVE ~p", [{A, B}]),
+ A
+ end , Q),
+ rabbit_log:info("OVERFLOW ~p ARGS ~p", [Overflow, amqqueue:get_arguments(Q)]),
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q),
MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q),
@@ -165,7 +177,8 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
max_in_memory_length => MaxMemoryLength,
max_in_memory_bytes => MaxMemoryBytes,
single_active_consumer_on => single_active_consumer_on(Q),
- delivery_limit => DeliveryLimit
+ delivery_limit => DeliveryLimit,
+ overflow_strategy => overflow(Overflow, drop_head)
}.
single_active_consumer_on(Q) ->
@@ -292,8 +305,13 @@ filter_quorum_critical(Queues, ReplicaStates) ->
-spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean().
is_policy_applicable(_Q, Policy) ->
- Applicable = [<<"max-length">>, <<"max-length-bytes">>, <<"max-in-memory-length">>,
- <<"max-in-memory-bytes">>, <<"delivery-limit">>, <<"dead-letter-exchange">>,
+ Applicable = [<<"max-length">>,
+ <<"max-length-bytes">>,
+ <<"x-overflow">>,
+ <<"max-in-memory-length">>,
+ <<"max-in-memory-bytes">>,
+ <<"delivery-limit">>,
+ <<"dead-letter-exchange">>,
<<"dead-letter-routing-key">>],
lists:all(fun({P, _}) ->
lists:member(P, Applicable)
@@ -694,13 +712,29 @@ stateless_deliver(ServerId, Delivery) ->
-spec deliver(Confirm :: boolean(), rabbit_types:delivery(),
rabbit_fifo_client:state()) ->
- {ok | slow, rabbit_fifo_client:state()}.
+ {ok | slow, rabbit_fifo_client:state()} |
+ {reject_publish, rabbit_fifo_client:state()}.
deliver(false, Delivery, QState0) ->
- rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0);
+ case rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0) of
+ {ok, _} = Res -> Res;
+ {slow, _} = Res -> Res;
+ {reject_publish, State} ->
+ {ok, State}
+ end;
deliver(true, Delivery, QState0) ->
- rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no,
- Delivery#delivery.message, QState0).
+ Seq = Delivery#delivery.msg_seq_no,
+ case rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no,
+ Delivery#delivery.message, QState0) of
+ {ok, _} = Res -> Res;
+ {slow, _} = Res -> Res;
+ {reject_publish, State} ->
+ %% TODO: this works fine but once the queue types interface is in
+ %% place it could be replaced with an action or similar to avoid
+ %% self publishing messages.
+ gen_server2:cast(self(), {reject_publish, Seq, undefined}),
+ {ok, State}
+ end.
-spec info(amqqueue:amqqueue()) -> rabbit_types:infos().
@@ -783,9 +817,9 @@ maybe_delete_data_dir(UId) ->
ok
end.
-policy_changed(QName, Node) ->
+policy_changed(QName, Server) ->
{ok, Q} = rabbit_amqqueue:lookup(QName),
- rabbit_fifo_client:update_machine_state(Node, ra_machine_config(Q)).
+ rabbit_fifo_client:update_machine_state(Server, ra_machine_config(Q)).
-spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'.
@@ -1321,7 +1355,7 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
check_invalid_arguments(QueueName, Args) ->
Keys = [<<"x-expires">>, <<"x-message-ttl">>,
- <<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>],
+ <<"x-max-priority">>, <<"x-queue-mode">>],
[case rabbit_misc:table_lookup(Args, Key) of
undefined -> ok;
_TypeVal -> rabbit_misc:protocol_error(
@@ -1413,3 +1447,7 @@ get_nodes(Q) when ?is_amqqueue(Q) ->
update_type_state(Q, Fun) when ?is_amqqueue(Q) ->
Ts = amqqueue:get_type_state(Q),
amqqueue:set_type_state(Q, Fun(Ts)).
+
+overflow(undefined, Def) -> Def;
+overflow(<<"reject-publish">>, _Def) -> reject_publish;
+overflow(<<"drop-head">>, _Def) -> drop_head.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index be3b46fbfb..ccb1e1f4d8 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -113,6 +113,7 @@ all_tests() ->
subscribe_redelivery_count,
message_bytes_metrics,
queue_length_limit_drop_head,
+ queue_length_limit_reject_publish,
subscribe_redelivery_limit,
subscribe_redelivery_policy,
subscribe_redelivery_limit_with_dead_letter,
@@ -612,14 +613,16 @@ publish_confirm(Ch, QName) ->
publish(Ch, QName),
amqp_channel:register_confirm_handler(Ch, self()),
ct:pal("waiting for confirms from ~s", [QName]),
- ok = receive
- #'basic.ack'{} -> ok;
- #'basic.nack'{} -> fail
- after 2500 ->
- exit(confirm_timeout)
- end,
- ct:pal("CONFIRMED! ~s", [QName]),
- ok.
+ receive
+ #'basic.ack'{} ->
+ ct:pal("CONFIRMED! ~s", [QName]),
+ ok;
+ #'basic.nack'{} ->
+ ct:pal("NOT CONFIRMED! ~s", [QName]),
+ fail
+ after 2500 ->
+ exit(confirm_timeout)
+ end.
publish_and_restart(Config) ->
%% Test the node restart with both types of queues (quorum and classic) to
@@ -1244,13 +1247,13 @@ simple_confirm_availability_on_leader_change(Config) ->
%% open a channel to another node
Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
- publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
%% stop the node hosting the leader
ok = rabbit_ct_broker_helpers:stop_node(Config, Node2),
%% this should not fail as the channel should detect the new leader and
%% resend to that
- publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
ok.
@@ -1270,7 +1273,7 @@ confirm_availability_on_leader_change(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
ConfirmLoop = fun Loop() ->
- publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
receive {done, P} ->
P ! done,
ok
@@ -2020,6 +2023,35 @@ queue_length_limit_drop_head(Config) ->
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = true})).
+queue_length_limit_reject_publish(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ RaName = ra_name(QQ),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-length">>, long, 1},
+ {<<"x-overflow">>, longstr, <<"reject-publish">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ ok = publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
+ %% give the channel some time to process the async reject_publish notification
+ %% now that we are over the limit it should start failing
+ wait_for_messages_total(Servers, RaName, 2),
+ fail = publish_confirm(Ch, QQ),
+ %% remove all messages
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = _}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})),
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = _}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})),
+ %% publish should be allowed again now
+ ok = publish_confirm(Ch, QQ),
+ ok.
+
queue_length_in_memory_limit_basic_get(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index e3dfb29e7d..ceab563865 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -59,19 +59,24 @@ end_per_testcase(_TestCase, _Config) ->
?ASSERT_EFF(EfxPat, true, Effects)).
-define(ASSERT_EFF(EfxPat, Guard, Effects),
- ?assert(lists:any(fun (EfxPat) when Guard -> true;
- (_) -> false
- end, Effects))).
+ ?assert(lists:any(fun (EfxPat) when Guard -> true;
+ (_) -> false
+ end, Effects))).
-define(ASSERT_NO_EFF(EfxPat, Effects),
- ?assert(not lists:any(fun (EfxPat) -> true;
- (_) -> false
- end, Effects))).
+ ?assert(not lists:any(fun (EfxPat) -> true;
+ (_) -> false
+ end, Effects))).
+
+-define(ASSERT_NO_EFF(EfxPat, Guard, Effects),
+ ?assert(not lists:any(fun (EfxPat) when Guard -> true;
+ (_) -> false
+ end, Effects))).
-define(assertNoEffect(EfxPat, Effects),
- ?assert(not lists:any(fun (EfxPat) -> true;
- (_) -> false
- end, Effects))).
+ ?assert(not lists:any(fun (EfxPat) -> true;
+ (_) -> false
+ end, Effects))).
test_init(Name) ->
init(#{name => Name,
@@ -1258,6 +1263,99 @@ single_active_with_credited_test(_) ->
State3#rabbit_fifo.waiting_consumers),
ok.
+
+register_enqueuer_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ max_length => 2,
+ overflow_strategy => reject_publish}),
+ %% simply registering should be ok when we're below limit
+ Pid1 = test_util:fake_pid(node()),
+ {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0),
+
+ {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1),
+ %% register another enqueuer shoudl be ok
+ Pid2 = test_util:fake_pid(node()),
+ {State3, ok, [_]} = apply(meta(3), make_register_enqueuer(Pid2), State2),
+
+ {State4, ok, _} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 2, two), State3),
+ {State5, ok, Efx} = apply(meta(5), rabbit_fifo:make_enqueue(Pid1, 3, three), State4),
+ % ct:pal("Efx ~p", [Efx]),
+ %% validate all registered enqueuers are notified of overflow state
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx),
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid2, Efx),
+
+ %% this time, registry should return reject_publish
+ {State6, reject_publish, [_]} = apply(meta(6), make_register_enqueuer(
+ test_util:fake_pid(node())), State5),
+ ?assertMatch(#{num_enqueuers := 3}, rabbit_fifo:overview(State6)),
+
+
+ %% remove two messages this should make the queue fall below the 0.8 limit
+ {State7, {dequeue, _, _}, _Efx7} =
+ apply(meta(7),
+ rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State6),
+ ct:pal("Efx7 ~p", [_Efx7]),
+ {State8, {dequeue, _, _}, Efx8} =
+ apply(meta(8),
+ rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State7),
+ ct:pal("Efx8 ~p", [Efx8]),
+ %% validate all registered enqueuers are notified of overflow state
+ ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid1, Efx8),
+ ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid2, Efx8),
+ {_State9, {dequeue, _, _}, Efx9} =
+ apply(meta(9),
+ rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State8),
+ ?ASSERT_NO_EFF({send_msg, P, go, [ra_event]}, P == Pid1, Efx9),
+ ?ASSERT_NO_EFF({send_msg, P, go, [ra_event]}, P == Pid2, Efx9),
+ ok.
+
+reject_publish_purge_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ max_length => 2,
+ overflow_strategy => reject_publish}),
+ %% simply registering should be ok when we're below limit
+ Pid1 = test_util:fake_pid(node()),
+ {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0),
+ {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1),
+ {State3, ok, _} = apply(meta(3), rabbit_fifo:make_enqueue(Pid1, 2, two), State2),
+ {State4, ok, Efx} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 3, three), State3),
+ % ct:pal("Efx ~p", [Efx]),
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx),
+ {_State5, {purge, 3}, Efx1} = apply(meta(5), rabbit_fifo:make_purge(), State4),
+ ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid1, Efx1),
+ ok.
+
+reject_publish_applied_after_limit_test(_) ->
+ InitConf = #{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8))
+ },
+ State0 = init(InitConf),
+ %% simply registering should be ok when we're below limit
+ Pid1 = test_util:fake_pid(node()),
+ {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0),
+ {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1),
+ {State3, ok, _} = apply(meta(3), rabbit_fifo:make_enqueue(Pid1, 2, two), State2),
+ {State4, ok, Efx} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 3, three), State3),
+ % ct:pal("Efx ~p", [Efx]),
+ ?ASSERT_NO_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx),
+ %% apply new config
+ Conf = #{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ max_length => 2,
+ overflow_strategy => reject_publish
+ },
+ {State5, ok, Efx1} = apply(meta(5), rabbit_fifo:make_update_config(Conf), State4),
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx1),
+ Pid2 = test_util:fake_pid(node()),
+ {_State6, reject_publish, _} = apply(meta(1), make_register_enqueuer(Pid2), State5),
+ ok.
+
purge_nodes_test(_) ->
Node = purged@node,
ThisNode = node(),
@@ -1412,6 +1510,7 @@ machine_version_test(_) ->
%% Utility
init(Conf) -> rabbit_fifo:init(Conf).
+make_register_enqueuer(Pid) -> rabbit_fifo:make_register_enqueuer(Pid).
apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State).
init_aux(Conf) -> rabbit_fifo:init_aux(Conf).
handle_aux(S, T, C, A, L, M) -> rabbit_fifo:handle_aux(S, T, C, A, L, M).
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 23522e71f9..dd2c7154d0 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -479,15 +479,17 @@ test_run_log(_Config) ->
snapshots(_Config) ->
run_proper(
fun () ->
- ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength,
- InMemoryBytes},
- frequency([{10, {0, 0, false, 0, 0, 0}},
+ ?FORALL({Length, Bytes, SingleActiveConsumer,
+ DeliveryLimit, InMemoryLength, InMemoryBytes,
+ Overflow},
+ frequency([{10, {0, 0, false, 0, 0, 0, drop_head}},
{5, {oneof([range(1, 10), undefined]),
oneof([range(1, 1000), undefined]),
boolean(),
oneof([range(1, 3), undefined]),
oneof([range(1, 10), undefined]),
- oneof([range(1, 1000), undefined])
+ oneof([range(1, 1000), undefined]),
+ oneof([drop_head, reject_publish])
}}]),
begin
Config = config(?FUNCTION_NAME,
@@ -496,7 +498,8 @@ snapshots(_Config) ->
SingleActiveConsumer,
DeliveryLimit,
InMemoryLength,
- InMemoryBytes),
+ InMemoryBytes,
+ Overflow),
?FORALL(O, ?LET(Ops, log_gen(256), expand(Ops, Config)),
collect({log_size, length(O)},
snapshots_prop(Config, O)))
@@ -681,6 +684,11 @@ max_length(_Config) ->
config(Name, Length, Bytes, SingleActive, DeliveryLimit,
InMemoryLength, InMemoryBytes) ->
+config(Name, Length, Bytes, SingleActive, DeliveryLimit,
+ InMemoryLength, InMemoryBytes, drop_head).
+
+config(Name, Length, Bytes, SingleActive, DeliveryLimit,
+ InMemoryLength, InMemoryBytes, Overflow) ->
#{name => Name,
max_length => map_max(Length),
max_bytes => map_max(Bytes),
@@ -688,7 +696,8 @@ config(Name, Length, Bytes, SingleActive, DeliveryLimit,
single_active_consumer_on => SingleActive,
delivery_limit => map_max(DeliveryLimit),
max_in_memory_length => map_max(InMemoryLength),
- max_in_memory_bytes => map_max(InMemoryBytes)}.
+ max_in_memory_bytes => map_max(InMemoryBytes),
+ overflow_strategy => Overflow}.
map_max(0) -> undefined;
map_max(N) -> N.