diff options
| -rw-r--r-- | src/rabbit_fifo.erl | 209 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 12 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 133 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 58 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 54 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 117 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 21 |
7 files changed, 461 insertions, 143 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 9ce39f5a64..9a2e3f3dc3 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -50,6 +50,7 @@ %% protocol helpers make_enqueue/3, + make_register_enqueuer/1, make_checkout/3, make_settle/2, make_return/2, @@ -64,6 +65,7 @@ -record(enqueue, {pid :: option(pid()), seq :: option(msg_seqno()), msg :: raw_msg()}). +-record(register_enqueuer, {pid :: pid()}). -record(checkout, {consumer_id :: consumer_id(), spec :: checkout_spec(), meta :: consumer_meta()}). @@ -83,6 +85,7 @@ -opaque protocol() :: #enqueue{} | + #register_enqueuer{} | #checkout{} | #settle{} | #return{} | @@ -164,9 +167,27 @@ zero(_) -> -spec apply(ra_machine:command_meta_data(), command(), state()) -> {state(), Reply :: term(), ra_machine:effects()} | {state(), Reply :: term()}. -apply(Metadata, #enqueue{pid = From, seq = Seq, - msg = RawMsg}, State00) -> - apply_enqueue(Metadata, From, Seq, RawMsg, State00); +apply(Meta, #enqueue{pid = From, seq = Seq, + msg = RawMsg}, State00) -> + apply_enqueue(Meta, From, Seq, RawMsg, State00); +apply(_Meta, #register_enqueuer{pid = Pid}, + #?MODULE{enqueuers = Enqueuers0, + cfg = #cfg{overflow_strategy = Overflow}} = State0) -> + + State = case maps:is_key(Pid, Enqueuers0) of + true -> + %% if the enqueuer exits just echo the overflow state + State0; + false -> + State0#?MODULE{enqueuers = Enqueuers0#{Pid => #enqueuer{}}} + end, + Res = case is_over_limit(State) of + true when Overflow == reject_publish -> + reject_publish; + _ -> + ok + end, + {State, Res, [{monitor, process, Pid}]}; apply(Meta, #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, #?MODULE{consumers = Cons0} = State) -> @@ -215,8 +236,9 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, ServiceQueue0), Cons = maps:put(ConsumerId, Con1, Cons0), {State1, ok, Effects} = - checkout(Meta, State0#?MODULE{service_queue = ServiceQueue, - consumers = Cons}, []), + checkout(Meta, State0, + State0#?MODULE{service_queue = ServiceQueue, + consumers = Cons}, []), Response = {send_credit_reply, messages_ready(State1)}, %% by this point all checkouts for the updated credit value %% should be processed so we can evaluate the drain @@ -262,7 +284,8 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, apply(_, #checkout{spec = {dequeue, _}}, #?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) -> {State0, {error, unsupported}}; -apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement}, +apply(#{index := Index, + from := From} = Meta, #checkout{spec = {dequeue, Settlement}, meta = ConsumerMeta, consumer_id = ConsumerId}, #?MODULE{consumers = Consumers} = State0) -> @@ -278,57 +301,69 @@ apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement}, {once, 1, simple_prefetch}, State0), {success, _, MsgId, Msg, State2} = checkout_one(State1), - {State, Effects} = case Settlement of - unsettled -> - {_, Pid} = ConsumerId, - {State2, [{monitor, process, Pid}]}; - settled -> - %% immediately settle the checkout - {State3, _, Effects0} = - apply(Meta, make_settle(ConsumerId, [MsgId]), - State2), - {State3, Effects0} - end, + {State4, Effects1} = case Settlement of + unsettled -> + {_, Pid} = ConsumerId, + {State2, [{monitor, process, Pid}]}; + settled -> + %% immediately settle the checkout + {State3, _, Effects0} = + apply(Meta, make_settle(ConsumerId, [MsgId]), + State2), + {State3, Effects0} + end, + {Reply, Effects2} = case Msg of {RaftIdx, {Header, 'empty'}} -> %% TODO add here new log effect with reply - {State, '$ra_no_reply', - reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From)}; + {'$ra_no_reply', + [reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From) | + Effects1]}; _ -> - {State, {dequeue, {MsgId, Msg}, Ready-1}, Effects} + {{dequeue, {MsgId, Msg}, Ready-1}, Effects1} + + end, + + case evaluate_limit(Index, false, State0, State4, Effects2) of + {State, true, Effects} -> + update_smallest_raft_index(Index, Reply, State, Effects); + {State, false, Effects} -> + {State, Reply, Effects} end end; apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> {State, Effects} = cancel_consumer(ConsumerId, State0, [], consumer_cancel), - checkout(Meta, State, Effects); + checkout(Meta, State0, State, Effects); apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, State0) -> State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0), - checkout(Meta, State1, [{monitor, process, Pid}]); -apply(#{index := RaftIdx}, #purge{}, + checkout(Meta, State0, State1, [{monitor, process, Pid}]); +apply(#{index := Index}, #purge{}, #?MODULE{ra_indexes = Indexes0, returns = Returns, messages = Messages} = State0) -> Total = messages_ready(State0), Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, - [I || {_, {I, _}} <- lqueue:to_list(Messages)]), + [I || {_, {I, _}} <- lqueue:to_list(Messages)]), Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1, [I || {_, {I, _}} <- lqueue:to_list(Returns)]), - {State, _, Effects} = - update_smallest_raft_index(RaftIdx, - State0#?MODULE{ra_indexes = Indexes, - messages = lqueue:new(), - returns = lqueue:new(), - msg_bytes_enqueue = 0, - prefix_msgs = {0, [], 0, []}, - msg_bytes_in_memory = 0, - msgs_ready_in_memory = 0}, - []), - %% as we're not checking out after a purge (no point) we have to - %% reverse the effects ourselves - {State, {purge, Total}, - lists:reverse([garbage_collection | Effects])}; + + State1 = State0#?MODULE{ra_indexes = Indexes, + messages = lqueue:new(), + returns = lqueue:new(), + msg_bytes_enqueue = 0, + prefix_msgs = {0, [], 0, []}, + msg_bytes_in_memory = 0, + msgs_ready_in_memory = 0}, + Effects0 = [garbage_collection], + Reply = {purge, Total}, + case evaluate_limit(Index, false, State0, State1, Effects0) of + {State, true, Effects} -> + update_smallest_raft_index(Index, Reply, State, Effects); + {State, false, Effects} -> + {State, Reply, Effects} + end; apply(Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, @@ -375,7 +410,7 @@ apply(Meta, {down, Pid, noconnection}, (_, E) -> E end, Enqs0), Effects = [{monitor, node, Node} | Effects1], - checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects); + checkout(Meta, State0, State#?MODULE{enqueuers = Enqs}, Effects); apply(Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> @@ -418,10 +453,10 @@ apply(Meta, {down, Pid, noconnection}, _ -> [{monitor, node, Node}] end ++ Effects1, - checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects); + checkout(Meta, State0, State#?MODULE{enqueuers = Enqs}, Effects); apply(Meta, {down, Pid, _Info}, State0) -> {State, Effects} = handle_down(Pid, State0), - checkout(Meta, State, Effects); + checkout(Meta, State0, State, Effects); apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, enqueuers = Enqs0, service_queue = SQ0} = State0) -> @@ -456,7 +491,7 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, service_queue = SQ, waiting_consumers = Waiting}, {State, Effects} = activate_next_consumer(State1, Effects1), - checkout(Meta, State, Effects); + checkout(Meta, State0, State, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; apply(_, #purge_nodes{nodes = Nodes}, State0) -> @@ -465,7 +500,7 @@ apply(_, #purge_nodes{nodes = Nodes}, State0) -> end, {State0, []}, Nodes), {State, ok, Effects}; apply(Meta, #update_config{config = Conf}, State) -> - checkout(Meta, update_config(Conf, State), []); + checkout(Meta, State, update_config(Conf, State), []); apply(_Meta, {machine_version, 0, 1}, V0State) -> State = convert_v0_to_v1(V0State), {State, ok, []}. @@ -1011,7 +1046,7 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of {ok, State1, Effects1} -> State2 = append_to_master_index(RaftIdx, State1), - {State, ok, Effects} = checkout(Meta, State2, Effects1), + {State, ok, Effects} = checkout(Meta, State0, State2, Effects1), {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects}; {duplicate, State, Effects} -> {State, ok, Effects} @@ -1173,7 +1208,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned, _ -> {State1, Effects1} end, - {State, ok, Effects} = checkout(Meta, State2, Effects3), + {State, ok, Effects} = checkout(Meta, State0, State2, Effects3), update_smallest_raft_index(IncomingRaftIdx, State, Effects). % used to processes messages that are finished @@ -1221,7 +1256,7 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, Discarded = maps:with(MsgIds, Checked0), {State2, Effects1} = complete(ConsumerId, Discarded, Con0, Effects0, State0), - {State, ok, Effects} = checkout(Meta, State2, Effects1), + {State, ok, Effects} = checkout(Meta, State0, State2, Effects1), update_smallest_raft_index(IncomingRaftIdx, State, Effects). dead_letter_effects(_Reason, _Discarded, @@ -1257,7 +1292,10 @@ cancel_consumer_effects(ConsumerId, [{mod_call, rabbit_quorum_queue, cancel_consumer_handler, [QName, ConsumerId]} | Effects]. -update_smallest_raft_index(IncomingRaftIdx, +update_smallest_raft_index(Idx, State, Effects) -> + update_smallest_raft_index(Idx, ok, State, Effects). + +update_smallest_raft_index(IncomingRaftIdx, Reply, #?MODULE{ra_indexes = Indexes, release_cursors = Cursors0} = State0, Effects) -> @@ -1267,17 +1305,16 @@ update_smallest_raft_index(IncomingRaftIdx, % we can forward release_cursor all the way until % the last received command, hooray State = State0#?MODULE{release_cursors = lqueue:new()}, - {State, ok, Effects ++ [{release_cursor, IncomingRaftIdx, State}]}; + {State, Reply, Effects ++ [{release_cursor, IncomingRaftIdx, State}]}; _ -> Smallest = rabbit_fifo_index:smallest(Indexes), case find_next_cursor(Smallest, Cursors0) of {empty, Cursors} -> - {State0#?MODULE{release_cursors = Cursors}, - ok, Effects}; + {State0#?MODULE{release_cursors = Cursors}, Reply, Effects}; {Cursor, Cursors} -> %% we can emit a release cursor when we've passed the smallest %% release cursor available. - {State0#?MODULE{release_cursors = Cursors}, ok, + {State0#?MODULE{release_cursors = Cursors}, Reply, Effects ++ [Cursor]} end end. @@ -1382,10 +1419,10 @@ return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, end, {State, Effects0}, Checked). %% checkout new messages to consumers -checkout(#{index := Index}, State0, Effects0) -> +checkout(#{index := Index}, OldState, State0, Effects0) -> {State1, _Result, Effects1} = checkout0(checkout_one(State0), Effects0, {#{}, #{}}), - case evaluate_limit(false, State1, Effects1) of + case evaluate_limit(Index, false, OldState, State1, Effects1) of {State, true, Effects} -> update_smallest_raft_index(Index, State, Effects); {State, false, Effects} -> @@ -1418,16 +1455,54 @@ checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) -> end, {State0, ok, lists:reverse(Effects1)}. -evaluate_limit(Result, +evaluate_limit(_Index, Result, _BeforeState, #?MODULE{cfg = #cfg{max_length = undefined, max_bytes = undefined}} = State, Effects) -> {State, Result, Effects}; -evaluate_limit(Result, State0, Effects0) -> +evaluate_limit(Index, Result, BeforeState, + #?MODULE{cfg = #cfg{overflow_strategy = Strategy}, + enqueuers = Enqs0} = State0, + Effects0) -> case is_over_limit(State0) of - true -> + true when Strategy == drop_head -> {State, Effects} = drop_head(State0, Effects0), - evaluate_limit(true, State, Effects); + evaluate_limit(Index, true, BeforeState, State, Effects); + true when Strategy == reject_publish -> + %% generate send_msg effect for each enqueuer to let them know + %% they need to block + {Enqs, Effects} = + maps:fold( + fun (P, #enqueuer{blocked = undefined} = E0, {Enqs, Acc}) -> + E = E0#enqueuer{blocked = Index}, + {Enqs#{P => E}, + [{send_msg, P, {queue_status, reject_publish}, + [ra_event]} | Acc]}; + (_P, _E, Acc) -> + Acc + end, {Enqs0, Effects0}, Enqs0), + {State0#?MODULE{enqueuers = Enqs}, Result, Effects}; + false when Strategy == reject_publish -> + %% TODO: optimise as this case gets called for every command + %% pretty much + Before = is_below_soft_limit(BeforeState), + case {Before, is_below_soft_limit(State0)} of + {false, true} -> + %% we have moved below the lower limit which + {Enqs, Effects} = + maps:fold( + fun (P, #enqueuer{} = E0, {Enqs, Acc}) -> + E = E0#enqueuer{blocked = undefined}, + {Enqs#{P => E}, + [{send_msg, P, {queue_status, go}, [ra_event]} + | Acc]}; + (_P, _E, Acc) -> + Acc + end, {Enqs0, Effects0}, Enqs0), + {State0#?MODULE{enqueuers = Enqs}, Result, Effects}; + _ -> + {State0, Result, Effects0} + end; false -> {State0, Result, Effects0} end. @@ -1753,12 +1828,30 @@ is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined, is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength, max_bytes = MaxBytes}, msg_bytes_enqueue = BytesEnq} = State) -> - messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes). +is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = undefined, + max_bytes = undefined}}) -> + false; +is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = MaxLength, + max_bytes = MaxBytes}, + msg_bytes_enqueue = BytesEnq} = State) -> + is_below(MaxLength, messages_ready(State)) andalso + is_below(MaxBytes, BytesEnq). + +is_below(undefined, _Num) -> + true; +is_below(Val, Num) when is_integer(Val) andalso is_integer(Num) -> + Num =< trunc(Val * ?LOW_LIMIT). + -spec make_enqueue(option(pid()), option(msg_seqno()), raw_msg()) -> protocol(). make_enqueue(Pid, Seq, Msg) -> #enqueue{pid = Pid, seq = Seq, msg = Msg}. + +-spec make_register_enqueuer(pid()) -> protocol(). +make_register_enqueuer(Pid) -> + #register_enqueuer{pid = Pid}. + -spec make_checkout(consumer_id(), checkout_spec(), consumer_meta()) -> protocol(). make_checkout(ConsumerId, Spec, Meta) -> diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index e337540c35..dc01b6ec1c 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -75,6 +75,7 @@ -define(GC_MEM_LIMIT_B, 2000000). -define(MB, 1048576). +-define(LOW_LIMIT, 0.8). -record(consumer, {meta = #{} :: consumer_meta(), @@ -105,11 +106,11 @@ % out of order enqueues - sorted list pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}], status = up :: up | - suspected_down | - %% it is useful to have a record of when this was blocked - %% so that we can retry sending the block effect if - %% the publisher did not receive the initial one - {blocked, At :: ra:index()} + suspected_down, + %% it is useful to have a record of when this was blocked + %% so that we can retry sending the block effect if + %% the publisher did not receive the initial one + blocked :: undefined | ra:index() }). -record(cfg, @@ -191,5 +192,6 @@ max_bytes => non_neg_integer(), max_in_memory_length => non_neg_integer(), max_in_memory_bytes => non_neg_integer(), + overflow_strategy => drop_head | reject_publish, single_active_consumer_on => boolean(), delivery_limit => non_neg_integer()}. diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 2ac9f6787a..a698f93e9e 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -53,9 +53,17 @@ -record(consumer, {last_msg_id :: seq() | -1, delivery_count = 0 :: non_neg_integer()}). --record(state, {cluster_name :: cluster_name(), - servers = [] :: [ra:server_id()], +-record(cfg, {cluster_name :: cluster_name(), + servers = [] :: [ra:server_id()], + soft_limit = ?SOFT_LIMIT :: non_neg_integer(), + block_handler = fun() -> ok end :: fun(() -> term()), + unblock_handler = fun() -> ok end :: fun(() -> ok), + timeout :: non_neg_integer(), + version = 0 :: non_neg_integer()}). + +-record(state, {cfg :: #cfg{}, leader :: undefined | ra:server_id(), + queue_status :: undefined | go | reject_publish, next_seq = 0 :: seq(), %% Last applied is initialise to -1 to note that no command has yet been %% applied, but allowing to resend messages if the first ones on the sequence @@ -66,15 +74,11 @@ slow = false :: boolean(), unsent_commands = #{} :: #{rabbit_fifo:consumer_id() => {[seq()], [seq()], [seq()]}}, - soft_limit = ?SOFT_LIMIT :: non_neg_integer(), pending = #{} :: #{seq() => {term(), rabbit_fifo:command()}}, consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() => #consumer{}}, - block_handler = fun() -> ok end :: fun(() -> term()), - unblock_handler = fun() -> ok end :: fun(() -> ok), - timer_state :: term(), - timeout :: non_neg_integer() + timer_state :: term() }). -opaque state() :: #state{}. @@ -103,21 +107,22 @@ init(ClusterName, Servers) -> -spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state(). init(ClusterName = #resource{}, Servers, SoftLimit) -> Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, - #state{cluster_name = ClusterName, - servers = Servers, - soft_limit = SoftLimit, - timeout = Timeout}. + #state{cfg = #cfg{cluster_name = ClusterName, + servers = Servers, + soft_limit = SoftLimit, + timeout = Timeout}}. -spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok), fun(() -> ok)) -> state(). init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, - #state{cluster_name = ClusterName, - servers = Servers, - block_handler = BlockFun, - unblock_handler = UnblockFun, - soft_limit = SoftLimit, - timeout = Timeout}. + #state{cfg = #cfg{cluster_name = ClusterName, + servers = Servers, + block_handler = BlockFun, + unblock_handler = UnblockFun, + soft_limit = SoftLimit, + timeout = Timeout}}. + %% @doc Enqueues a message. %% @param Correlation an arbitrary erlang term used to correlate this @@ -132,10 +137,41 @@ init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> %% SequenceNumber can be correlated to the applied sequence numbers returned %% by the {@link handle_ra_event/2. handle_ra_event/2} function. -spec enqueue(Correlation :: term(), Msg :: term(), State :: state()) -> - {ok | slow, state()}. -enqueue(Correlation, Msg, State0 = #state{slow = Slow, - block_handler = BlockFun}) -> - Node = pick_node(State0), + {ok | slow | reject_publish, state()}. +enqueue(Correlation, Msg, + #state{queue_status = undefined, + next_enqueue_seq = 1, + cfg = #cfg{}} = State0) -> + %% it is the first enqueue, check the version + {_, Node} = Server = pick_server(State0), + State = + case rpc:call(Node, rabbit_fifo, version, []) of + 0 -> + %% the leader is running the old version + %% so we can't initialize the enqueuer session safely + State0#state{queue_status = go}; + 1 -> + %% were running the new version on the leader do sync initialisation + %% of enqueuer session + Reg = rabbit_fifo:make_register_enqueuer(self()), + case ra:process_command(Server, Reg) of + {ok, reject_publish, _} -> + State0#state{queue_status = reject_publish}; + {ok, ok, _} -> + State0#state{queue_status = go}; + Err -> + exit(Err) + end + end, + enqueue(Correlation, Msg, State); +enqueue(_Correlation, _Msg, + #state{queue_status = reject_publish, + cfg = #cfg{}} = State) -> + {reject_publish, State}; +enqueue(Correlation, Msg, + #state{slow = Slow, + cfg = #cfg{block_handler = BlockFun}} = State0) -> + Node = pick_server(State0), {Next, State1} = next_enqueue_seq(State0), % by default there is no correlation id Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg), @@ -159,7 +195,7 @@ enqueue(Correlation, Msg, State0 = #state{slow = Slow, %% by the {@link handle_ra_event/2. handle_ra_event/2} function. %% -spec enqueue(Msg :: term(), State :: state()) -> - {ok | slow, state()}. + {ok | slow | reject_publish, state()}. enqueue(Msg, State) -> enqueue(undefined, Msg, State). @@ -178,8 +214,9 @@ enqueue(Msg, State) -> Settlement :: settled | unsettled, state()) -> {ok, {rabbit_fifo:delivery_msg(), non_neg_integer()} | empty, state()} | {error | timeout, term()}. -dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) -> - Node = pick_node(State0), +dequeue(ConsumerTag, Settlement, + #state{cfg = #cfg{timeout = Timeout}} = State0) -> + Node = pick_server(State0), ConsumerId = consumer_id(ConsumerTag), case ra:process_command(Node, rabbit_fifo:make_checkout(ConsumerId, @@ -189,7 +226,8 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) -> {ok, {dequeue, empty}, Leader} -> {ok, empty, State0#state{leader = Leader}}; {ok, {dequeue, Msg, NumReady}, Leader} -> - {ok, {Msg, NumReady}, State0#state{leader = Leader}}; + {ok, {Msg, NumReady}, + State0#state{leader = Leader}}; {ok, {error, _} = Err, _Leader} -> Err; Err -> @@ -208,7 +246,7 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) -> -spec settle(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> {ok, state()}. settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> - Node = pick_node(State0), + Node = pick_server(State0), Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds), case send_command(Node, undefined, Cmd, normal, State0) of {slow, S} -> @@ -241,7 +279,7 @@ settle(ConsumerTag, [_|_] = MsgIds, -spec return(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> {ok, state()}. return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> - Node = pick_node(State0), + Node = pick_server(State0), % TODO: make rabbit_fifo return support lists of message ids Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds), case send_command(Node, undefined, Cmd, normal, State0) of @@ -275,7 +313,7 @@ return(ConsumerTag, [_|_] = MsgIds, -spec discard(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> {ok | slow, state()}. discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> - Node = pick_node(State0), + Node = pick_server(State0), Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds), case send_command(Node, undefined, Cmd, normal, State0) of {slow, S} -> @@ -363,7 +401,7 @@ credit(ConsumerTag, Credit, Drain, %% the last received msgid provides us with the delivery count if we %% add one as it is 0 indexed C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}), - Node = pick_node(State0), + Node = pick_server(State0), Cmd = rabbit_fifo:make_credit(ConsumerId, Credit, C#consumer.last_msg_id + 1, Drain), case send_command(Node, undefined, Cmd, normal, State0) of @@ -429,11 +467,11 @@ stat(Leader, Timeout) -> %% @doc returns the cluster name -spec cluster_name(state()) -> cluster_name(). -cluster_name(#state{cluster_name = ClusterName}) -> +cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) -> ClusterName. -update_machine_state(Node, Conf) -> - case ra:process_command(Node, rabbit_fifo:make_update_config(Conf)) of +update_machine_state(Server, Conf) -> + case ra:process_command(Server, rabbit_fifo:make_update_config(Conf)) of {ok, ok, _} -> ok; Err -> @@ -487,10 +525,11 @@ update_machine_state(Node, Conf) -> {internal, Correlators :: [term()], actions(), state()} | {rabbit_fifo:client_msg(), state()} | eol. handle_ra_event(From, {applied, Seqs}, - #state{soft_limit = SftLmt, - unblock_handler = UnblockFun} = State0) -> + #state{cfg = #cfg{soft_limit = SftLmt, + unblock_handler = UnblockFun}} = State00) -> + State0 = State00#state{leader = From}, {Corrs, Actions, State1} = lists:foldl(fun seq_applied/2, - {[], [], State0#state{leader = From}}, + {[], [], State0}, Seqs), case maps:size(State1#state.pending) < SftLmt of true when State1#state.slow == true -> @@ -511,7 +550,7 @@ handle_ra_event(From, {applied, Seqs}, add_command(Cid, discard, Discards, Acc))) end, [], State1#state.unsent_commands), - Node = pick_node(State2), + Node = pick_server(State2), %% send all the settlements and returns State = lists:foldl(fun (C, S0) -> case send_command(Node, undefined, @@ -527,6 +566,10 @@ handle_ra_event(From, {applied, Seqs}, end; handle_ra_event(From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) -> handle_delivery(From, Del, State0); +handle_ra_event(_, {machine, {queue_status, Status}}, + #state{} = State) -> + %% just set the queue status + {internal, [], [], State#state{queue_status = Status}}; handle_ra_event(Leader, {machine, leader_change}, #state{leader = Leader} = State) -> %% leader already known @@ -543,7 +586,7 @@ handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) -> State1 = State0#state{leader = Leader}, State = resend(Seq, State1), {internal, [], [], State}; -handle_ra_event(_, timeout, #state{servers = Servers} = State0) -> +handle_ra_event(_, timeout, #state{cfg = #cfg{servers = Servers}} = State0) -> case find_leader(Servers) of undefined -> %% still no leader, set the timer again @@ -642,7 +685,6 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) -> error -> State end. - resend_all_pending(#state{pending = Pend} = State) -> Seqs = lists:sort(maps:keys(Pend)), lists:foldl(fun resend/2, State, Seqs). @@ -719,16 +761,19 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) -> [Leader]) end. -pick_node(#state{leader = undefined, servers = [N | _]}) -> +pick_server(#state{leader = undefined, + cfg = #cfg{servers = [N | _]}}) -> %% TODO: pick random rather that first? N; -pick_node(#state{leader = Leader}) -> +pick_server(#state{leader = Leader}) -> Leader. % servers sorted by last known leader -sorted_servers(#state{leader = undefined, servers = Servers}) -> +sorted_servers(#state{leader = undefined, + cfg = #cfg{servers = Servers}}) -> Servers; -sorted_servers(#state{leader = Leader, servers = Servers}) -> +sorted_servers(#state{leader = Leader, + cfg = #cfg{servers = Servers}}) -> [Leader | lists:delete(Leader, Servers)]. next_seq(#state{next_seq = Seq} = State) -> @@ -742,7 +787,7 @@ consumer_id(ConsumerTag) -> send_command(Server, Correlation, Command, Priority, #state{pending = Pending, - soft_limit = SftLmt} = State0) -> + cfg = #cfg{soft_limit = SftLmt}} = State0) -> {Seq, State} = next_seq(State0), ok = ra:pipeline_command(Server, Command, Seq, Priority), Tag = case maps:size(Pending) >= SftLmt of @@ -767,7 +812,7 @@ add_command(Cid, return, MsgIds, Acc) -> add_command(Cid, discard, MsgIds, Acc) -> [rabbit_fifo:make_discard(Cid, MsgIds) | Acc]. -set_timer(#state{servers = [Server | _]} = State) -> +set_timer(#state{cfg = #cfg{servers = [Server | _]}} = State) -> Ref = erlang:send_after(?TIMER_TIME, self(), {ra_event, Server, timeout}), State#state{timer_state = Ref}. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 915cf9d527..22333f6bdf 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -125,6 +125,12 @@ declare(Q) when ?amqqueue_is_quorum(Q) -> || ServerId <- members(NewQ)], case ra:start_cluster(RaConfs) of {ok, _, _} -> + %% TODO: handle error - what should be done if the + %% config cannot be updated + ok = rabbit_fifo_client:update_machine_state(Id, + ra_machine_config(NewQ)), + %% force a policy change to ensure the latest config is + %% updated even when running the machine version from 0 rabbit_event:notify(queue_created, [{name, QName}, {durable, Durable}, @@ -152,6 +158,12 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> {Name, _} = amqqueue:get_pid(Q), %% take the minimum value of the policy and the queue arg if present MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q), + Overflow = args_policy_lookup(<<"overflow">>, + fun (A, B) -> + rabbit_log:info("RESOLVE ~p", [{A, B}]), + A + end , Q), + rabbit_log:info("OVERFLOW ~p ARGS ~p", [Overflow, amqqueue:get_arguments(Q)]), MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q), MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q), @@ -165,7 +177,8 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> max_in_memory_length => MaxMemoryLength, max_in_memory_bytes => MaxMemoryBytes, single_active_consumer_on => single_active_consumer_on(Q), - delivery_limit => DeliveryLimit + delivery_limit => DeliveryLimit, + overflow_strategy => overflow(Overflow, drop_head) }. single_active_consumer_on(Q) -> @@ -292,8 +305,13 @@ filter_quorum_critical(Queues, ReplicaStates) -> -spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean(). is_policy_applicable(_Q, Policy) -> - Applicable = [<<"max-length">>, <<"max-length-bytes">>, <<"max-in-memory-length">>, - <<"max-in-memory-bytes">>, <<"delivery-limit">>, <<"dead-letter-exchange">>, + Applicable = [<<"max-length">>, + <<"max-length-bytes">>, + <<"x-overflow">>, + <<"max-in-memory-length">>, + <<"max-in-memory-bytes">>, + <<"delivery-limit">>, + <<"dead-letter-exchange">>, <<"dead-letter-routing-key">>], lists:all(fun({P, _}) -> lists:member(P, Applicable) @@ -694,13 +712,29 @@ stateless_deliver(ServerId, Delivery) -> -spec deliver(Confirm :: boolean(), rabbit_types:delivery(), rabbit_fifo_client:state()) -> - {ok | slow, rabbit_fifo_client:state()}. + {ok | slow, rabbit_fifo_client:state()} | + {reject_publish, rabbit_fifo_client:state()}. deliver(false, Delivery, QState0) -> - rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0); + case rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0) of + {ok, _} = Res -> Res; + {slow, _} = Res -> Res; + {reject_publish, State} -> + {ok, State} + end; deliver(true, Delivery, QState0) -> - rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no, - Delivery#delivery.message, QState0). + Seq = Delivery#delivery.msg_seq_no, + case rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no, + Delivery#delivery.message, QState0) of + {ok, _} = Res -> Res; + {slow, _} = Res -> Res; + {reject_publish, State} -> + %% TODO: this works fine but once the queue types interface is in + %% place it could be replaced with an action or similar to avoid + %% self publishing messages. + gen_server2:cast(self(), {reject_publish, Seq, undefined}), + {ok, State} + end. -spec info(amqqueue:amqqueue()) -> rabbit_types:infos(). @@ -783,9 +817,9 @@ maybe_delete_data_dir(UId) -> ok end. -policy_changed(QName, Node) -> +policy_changed(QName, Server) -> {ok, Q} = rabbit_amqqueue:lookup(QName), - rabbit_fifo_client:update_machine_state(Node, ra_machine_config(Q)). + rabbit_fifo_client:update_machine_state(Server, ra_machine_config(Q)). -spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'. @@ -1321,7 +1355,7 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). check_invalid_arguments(QueueName, Args) -> Keys = [<<"x-expires">>, <<"x-message-ttl">>, - <<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>], + <<"x-max-priority">>, <<"x-queue-mode">>], [case rabbit_misc:table_lookup(Args, Key) of undefined -> ok; _TypeVal -> rabbit_misc:protocol_error( @@ -1413,3 +1447,7 @@ get_nodes(Q) when ?is_amqqueue(Q) -> update_type_state(Q, Fun) when ?is_amqqueue(Q) -> Ts = amqqueue:get_type_state(Q), amqqueue:set_type_state(Q, Fun(Ts)). + +overflow(undefined, Def) -> Def; +overflow(<<"reject-publish">>, _Def) -> reject_publish; +overflow(<<"drop-head">>, _Def) -> drop_head. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index be3b46fbfb..ccb1e1f4d8 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -113,6 +113,7 @@ all_tests() -> subscribe_redelivery_count, message_bytes_metrics, queue_length_limit_drop_head, + queue_length_limit_reject_publish, subscribe_redelivery_limit, subscribe_redelivery_policy, subscribe_redelivery_limit_with_dead_letter, @@ -612,14 +613,16 @@ publish_confirm(Ch, QName) -> publish(Ch, QName), amqp_channel:register_confirm_handler(Ch, self()), ct:pal("waiting for confirms from ~s", [QName]), - ok = receive - #'basic.ack'{} -> ok; - #'basic.nack'{} -> fail - after 2500 -> - exit(confirm_timeout) - end, - ct:pal("CONFIRMED! ~s", [QName]), - ok. + receive + #'basic.ack'{} -> + ct:pal("CONFIRMED! ~s", [QName]), + ok; + #'basic.nack'{} -> + ct:pal("NOT CONFIRMED! ~s", [QName]), + fail + after 2500 -> + exit(confirm_timeout) + end. publish_and_restart(Config) -> %% Test the node restart with both types of queues (quorum and classic) to @@ -1244,13 +1247,13 @@ simple_confirm_availability_on_leader_change(Config) -> %% open a channel to another node Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), - publish_confirm(Ch, QQ), + ok = publish_confirm(Ch, QQ), %% stop the node hosting the leader ok = rabbit_ct_broker_helpers:stop_node(Config, Node2), %% this should not fail as the channel should detect the new leader and %% resend to that - publish_confirm(Ch, QQ), + ok = publish_confirm(Ch, QQ), ok = rabbit_ct_broker_helpers:start_node(Config, Node2), ok. @@ -1270,7 +1273,7 @@ confirm_availability_on_leader_change(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), ConfirmLoop = fun Loop() -> - publish_confirm(Ch, QQ), + ok = publish_confirm(Ch, QQ), receive {done, P} -> P ! done, ok @@ -2020,6 +2023,35 @@ queue_length_limit_drop_head(Config) -> amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = true})). +queue_length_limit_reject_publish(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + RaName = ra_name(QQ), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-length">>, long, 1}, + {<<"x-overflow">>, longstr, <<"reject-publish">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + ok = publish_confirm(Ch, QQ), + ok = publish_confirm(Ch, QQ), + %% give the channel some time to process the async reject_publish notification + %% now that we are over the limit it should start failing + wait_for_messages_total(Servers, RaName, 2), + fail = publish_confirm(Ch, QQ), + %% remove all messages + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = _}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = _}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})), + %% publish should be allowed again now + ok = publish_confirm(Ch, QQ), + ok. + queue_length_in_memory_limit_basic_get(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index e3dfb29e7d..ceab563865 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -59,19 +59,24 @@ end_per_testcase(_TestCase, _Config) -> ?ASSERT_EFF(EfxPat, true, Effects)). -define(ASSERT_EFF(EfxPat, Guard, Effects), - ?assert(lists:any(fun (EfxPat) when Guard -> true; - (_) -> false - end, Effects))). + ?assert(lists:any(fun (EfxPat) when Guard -> true; + (_) -> false + end, Effects))). -define(ASSERT_NO_EFF(EfxPat, Effects), - ?assert(not lists:any(fun (EfxPat) -> true; - (_) -> false - end, Effects))). + ?assert(not lists:any(fun (EfxPat) -> true; + (_) -> false + end, Effects))). + +-define(ASSERT_NO_EFF(EfxPat, Guard, Effects), + ?assert(not lists:any(fun (EfxPat) when Guard -> true; + (_) -> false + end, Effects))). -define(assertNoEffect(EfxPat, Effects), - ?assert(not lists:any(fun (EfxPat) -> true; - (_) -> false - end, Effects))). + ?assert(not lists:any(fun (EfxPat) -> true; + (_) -> false + end, Effects))). test_init(Name) -> init(#{name => Name, @@ -1258,6 +1263,99 @@ single_active_with_credited_test(_) -> State3#rabbit_fifo.waiting_consumers), ok. + +register_enqueuer_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + max_length => 2, + overflow_strategy => reject_publish}), + %% simply registering should be ok when we're below limit + Pid1 = test_util:fake_pid(node()), + {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0), + + {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1), + %% register another enqueuer shoudl be ok + Pid2 = test_util:fake_pid(node()), + {State3, ok, [_]} = apply(meta(3), make_register_enqueuer(Pid2), State2), + + {State4, ok, _} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 2, two), State3), + {State5, ok, Efx} = apply(meta(5), rabbit_fifo:make_enqueue(Pid1, 3, three), State4), + % ct:pal("Efx ~p", [Efx]), + %% validate all registered enqueuers are notified of overflow state + ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx), + ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid2, Efx), + + %% this time, registry should return reject_publish + {State6, reject_publish, [_]} = apply(meta(6), make_register_enqueuer( + test_util:fake_pid(node())), State5), + ?assertMatch(#{num_enqueuers := 3}, rabbit_fifo:overview(State6)), + + + %% remove two messages this should make the queue fall below the 0.8 limit + {State7, {dequeue, _, _}, _Efx7} = + apply(meta(7), + rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State6), + ct:pal("Efx7 ~p", [_Efx7]), + {State8, {dequeue, _, _}, Efx8} = + apply(meta(8), + rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State7), + ct:pal("Efx8 ~p", [Efx8]), + %% validate all registered enqueuers are notified of overflow state + ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid1, Efx8), + ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid2, Efx8), + {_State9, {dequeue, _, _}, Efx9} = + apply(meta(9), + rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State8), + ?ASSERT_NO_EFF({send_msg, P, go, [ra_event]}, P == Pid1, Efx9), + ?ASSERT_NO_EFF({send_msg, P, go, [ra_event]}, P == Pid2, Efx9), + ok. + +reject_publish_purge_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + max_length => 2, + overflow_strategy => reject_publish}), + %% simply registering should be ok when we're below limit + Pid1 = test_util:fake_pid(node()), + {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0), + {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1), + {State3, ok, _} = apply(meta(3), rabbit_fifo:make_enqueue(Pid1, 2, two), State2), + {State4, ok, Efx} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 3, three), State3), + % ct:pal("Efx ~p", [Efx]), + ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx), + {_State5, {purge, 3}, Efx1} = apply(meta(5), rabbit_fifo:make_purge(), State4), + ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid1, Efx1), + ok. + +reject_publish_applied_after_limit_test(_) -> + InitConf = #{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)) + }, + State0 = init(InitConf), + %% simply registering should be ok when we're below limit + Pid1 = test_util:fake_pid(node()), + {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0), + {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1), + {State3, ok, _} = apply(meta(3), rabbit_fifo:make_enqueue(Pid1, 2, two), State2), + {State4, ok, Efx} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 3, three), State3), + % ct:pal("Efx ~p", [Efx]), + ?ASSERT_NO_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx), + %% apply new config + Conf = #{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + max_length => 2, + overflow_strategy => reject_publish + }, + {State5, ok, Efx1} = apply(meta(5), rabbit_fifo:make_update_config(Conf), State4), + ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx1), + Pid2 = test_util:fake_pid(node()), + {_State6, reject_publish, _} = apply(meta(1), make_register_enqueuer(Pid2), State5), + ok. + purge_nodes_test(_) -> Node = purged@node, ThisNode = node(), @@ -1412,6 +1510,7 @@ machine_version_test(_) -> %% Utility init(Conf) -> rabbit_fifo:init(Conf). +make_register_enqueuer(Pid) -> rabbit_fifo:make_register_enqueuer(Pid). apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State). init_aux(Conf) -> rabbit_fifo:init_aux(Conf). handle_aux(S, T, C, A, L, M) -> rabbit_fifo:handle_aux(S, T, C, A, L, M). diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index 23522e71f9..dd2c7154d0 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -479,15 +479,17 @@ test_run_log(_Config) -> snapshots(_Config) -> run_proper( fun () -> - ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength, - InMemoryBytes}, - frequency([{10, {0, 0, false, 0, 0, 0}}, + ?FORALL({Length, Bytes, SingleActiveConsumer, + DeliveryLimit, InMemoryLength, InMemoryBytes, + Overflow}, + frequency([{10, {0, 0, false, 0, 0, 0, drop_head}}, {5, {oneof([range(1, 10), undefined]), oneof([range(1, 1000), undefined]), boolean(), oneof([range(1, 3), undefined]), oneof([range(1, 10), undefined]), - oneof([range(1, 1000), undefined]) + oneof([range(1, 1000), undefined]), + oneof([drop_head, reject_publish]) }}]), begin Config = config(?FUNCTION_NAME, @@ -496,7 +498,8 @@ snapshots(_Config) -> SingleActiveConsumer, DeliveryLimit, InMemoryLength, - InMemoryBytes), + InMemoryBytes, + Overflow), ?FORALL(O, ?LET(Ops, log_gen(256), expand(Ops, Config)), collect({log_size, length(O)}, snapshots_prop(Config, O))) @@ -681,6 +684,11 @@ max_length(_Config) -> config(Name, Length, Bytes, SingleActive, DeliveryLimit, InMemoryLength, InMemoryBytes) -> +config(Name, Length, Bytes, SingleActive, DeliveryLimit, + InMemoryLength, InMemoryBytes, drop_head). + +config(Name, Length, Bytes, SingleActive, DeliveryLimit, + InMemoryLength, InMemoryBytes, Overflow) -> #{name => Name, max_length => map_max(Length), max_bytes => map_max(Bytes), @@ -688,7 +696,8 @@ config(Name, Length, Bytes, SingleActive, DeliveryLimit, single_active_consumer_on => SingleActive, delivery_limit => map_max(DeliveryLimit), max_in_memory_length => map_max(InMemoryLength), - max_in_memory_bytes => map_max(InMemoryBytes)}. + max_in_memory_bytes => map_max(InMemoryBytes), + overflow_strategy => Overflow}. map_max(0) -> undefined; map_max(N) -> N. |
