diff options
| author | kjnilsson <knilsson@pivotal.io> | 2020-07-10 10:20:25 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2020-09-07 09:42:10 +0100 |
| commit | c7fcee5a1f317be3f589fa739b9dbf1b6f7253ce (patch) | |
| tree | 61e6ddfc2504bbc0b9f5caeb7a656c68d593d076 /src | |
| parent | 6c7970e674772457f4f3cd8c66ca21b9ac5102a3 (diff) | |
| download | rabbitmq-server-git-c7fcee5a1f317be3f589fa739b9dbf1b6f7253ce.tar.gz | |
Implement reject_publish for QQs
The reject publish overflow strategy for quorum queues is an inexact
implementation that relies on the cooperation of publishing channels.
When a channel first wants to publish to a quorum queue it first issues
a synchonous register_enqueuer command which will return the current
queue overflow state as reject_publish if the queue is full.
The queue will also notify any active enqueuers when it reaches the
limit but will continue to accept any enqueues it receives after that.
Once the queue size goes below 80% of the limit(s) the queue will again
notify enqueuers that they can resume publishin inte the queue.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 209 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 12 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 133 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 58 |
4 files changed, 295 insertions, 117 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 9ce39f5a64..9a2e3f3dc3 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -50,6 +50,7 @@ %% protocol helpers make_enqueue/3, + make_register_enqueuer/1, make_checkout/3, make_settle/2, make_return/2, @@ -64,6 +65,7 @@ -record(enqueue, {pid :: option(pid()), seq :: option(msg_seqno()), msg :: raw_msg()}). +-record(register_enqueuer, {pid :: pid()}). -record(checkout, {consumer_id :: consumer_id(), spec :: checkout_spec(), meta :: consumer_meta()}). @@ -83,6 +85,7 @@ -opaque protocol() :: #enqueue{} | + #register_enqueuer{} | #checkout{} | #settle{} | #return{} | @@ -164,9 +167,27 @@ zero(_) -> -spec apply(ra_machine:command_meta_data(), command(), state()) -> {state(), Reply :: term(), ra_machine:effects()} | {state(), Reply :: term()}. -apply(Metadata, #enqueue{pid = From, seq = Seq, - msg = RawMsg}, State00) -> - apply_enqueue(Metadata, From, Seq, RawMsg, State00); +apply(Meta, #enqueue{pid = From, seq = Seq, + msg = RawMsg}, State00) -> + apply_enqueue(Meta, From, Seq, RawMsg, State00); +apply(_Meta, #register_enqueuer{pid = Pid}, + #?MODULE{enqueuers = Enqueuers0, + cfg = #cfg{overflow_strategy = Overflow}} = State0) -> + + State = case maps:is_key(Pid, Enqueuers0) of + true -> + %% if the enqueuer exits just echo the overflow state + State0; + false -> + State0#?MODULE{enqueuers = Enqueuers0#{Pid => #enqueuer{}}} + end, + Res = case is_over_limit(State) of + true when Overflow == reject_publish -> + reject_publish; + _ -> + ok + end, + {State, Res, [{monitor, process, Pid}]}; apply(Meta, #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, #?MODULE{consumers = Cons0} = State) -> @@ -215,8 +236,9 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, ServiceQueue0), Cons = maps:put(ConsumerId, Con1, Cons0), {State1, ok, Effects} = - checkout(Meta, State0#?MODULE{service_queue = ServiceQueue, - consumers = Cons}, []), + checkout(Meta, State0, + State0#?MODULE{service_queue = ServiceQueue, + consumers = Cons}, []), Response = {send_credit_reply, messages_ready(State1)}, %% by this point all checkouts for the updated credit value %% should be processed so we can evaluate the drain @@ -262,7 +284,8 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, apply(_, #checkout{spec = {dequeue, _}}, #?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) -> {State0, {error, unsupported}}; -apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement}, +apply(#{index := Index, + from := From} = Meta, #checkout{spec = {dequeue, Settlement}, meta = ConsumerMeta, consumer_id = ConsumerId}, #?MODULE{consumers = Consumers} = State0) -> @@ -278,57 +301,69 @@ apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement}, {once, 1, simple_prefetch}, State0), {success, _, MsgId, Msg, State2} = checkout_one(State1), - {State, Effects} = case Settlement of - unsettled -> - {_, Pid} = ConsumerId, - {State2, [{monitor, process, Pid}]}; - settled -> - %% immediately settle the checkout - {State3, _, Effects0} = - apply(Meta, make_settle(ConsumerId, [MsgId]), - State2), - {State3, Effects0} - end, + {State4, Effects1} = case Settlement of + unsettled -> + {_, Pid} = ConsumerId, + {State2, [{monitor, process, Pid}]}; + settled -> + %% immediately settle the checkout + {State3, _, Effects0} = + apply(Meta, make_settle(ConsumerId, [MsgId]), + State2), + {State3, Effects0} + end, + {Reply, Effects2} = case Msg of {RaftIdx, {Header, 'empty'}} -> %% TODO add here new log effect with reply - {State, '$ra_no_reply', - reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From)}; + {'$ra_no_reply', + [reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From) | + Effects1]}; _ -> - {State, {dequeue, {MsgId, Msg}, Ready-1}, Effects} + {{dequeue, {MsgId, Msg}, Ready-1}, Effects1} + + end, + + case evaluate_limit(Index, false, State0, State4, Effects2) of + {State, true, Effects} -> + update_smallest_raft_index(Index, Reply, State, Effects); + {State, false, Effects} -> + {State, Reply, Effects} end end; apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> {State, Effects} = cancel_consumer(ConsumerId, State0, [], consumer_cancel), - checkout(Meta, State, Effects); + checkout(Meta, State0, State, Effects); apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, State0) -> State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0), - checkout(Meta, State1, [{monitor, process, Pid}]); -apply(#{index := RaftIdx}, #purge{}, + checkout(Meta, State0, State1, [{monitor, process, Pid}]); +apply(#{index := Index}, #purge{}, #?MODULE{ra_indexes = Indexes0, returns = Returns, messages = Messages} = State0) -> Total = messages_ready(State0), Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, - [I || {_, {I, _}} <- lqueue:to_list(Messages)]), + [I || {_, {I, _}} <- lqueue:to_list(Messages)]), Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1, [I || {_, {I, _}} <- lqueue:to_list(Returns)]), - {State, _, Effects} = - update_smallest_raft_index(RaftIdx, - State0#?MODULE{ra_indexes = Indexes, - messages = lqueue:new(), - returns = lqueue:new(), - msg_bytes_enqueue = 0, - prefix_msgs = {0, [], 0, []}, - msg_bytes_in_memory = 0, - msgs_ready_in_memory = 0}, - []), - %% as we're not checking out after a purge (no point) we have to - %% reverse the effects ourselves - {State, {purge, Total}, - lists:reverse([garbage_collection | Effects])}; + + State1 = State0#?MODULE{ra_indexes = Indexes, + messages = lqueue:new(), + returns = lqueue:new(), + msg_bytes_enqueue = 0, + prefix_msgs = {0, [], 0, []}, + msg_bytes_in_memory = 0, + msgs_ready_in_memory = 0}, + Effects0 = [garbage_collection], + Reply = {purge, Total}, + case evaluate_limit(Index, false, State0, State1, Effects0) of + {State, true, Effects} -> + update_smallest_raft_index(Index, Reply, State, Effects); + {State, false, Effects} -> + {State, Reply, Effects} + end; apply(Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, @@ -375,7 +410,7 @@ apply(Meta, {down, Pid, noconnection}, (_, E) -> E end, Enqs0), Effects = [{monitor, node, Node} | Effects1], - checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects); + checkout(Meta, State0, State#?MODULE{enqueuers = Enqs}, Effects); apply(Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> @@ -418,10 +453,10 @@ apply(Meta, {down, Pid, noconnection}, _ -> [{monitor, node, Node}] end ++ Effects1, - checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects); + checkout(Meta, State0, State#?MODULE{enqueuers = Enqs}, Effects); apply(Meta, {down, Pid, _Info}, State0) -> {State, Effects} = handle_down(Pid, State0), - checkout(Meta, State, Effects); + checkout(Meta, State0, State, Effects); apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, enqueuers = Enqs0, service_queue = SQ0} = State0) -> @@ -456,7 +491,7 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, service_queue = SQ, waiting_consumers = Waiting}, {State, Effects} = activate_next_consumer(State1, Effects1), - checkout(Meta, State, Effects); + checkout(Meta, State0, State, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; apply(_, #purge_nodes{nodes = Nodes}, State0) -> @@ -465,7 +500,7 @@ apply(_, #purge_nodes{nodes = Nodes}, State0) -> end, {State0, []}, Nodes), {State, ok, Effects}; apply(Meta, #update_config{config = Conf}, State) -> - checkout(Meta, update_config(Conf, State), []); + checkout(Meta, State, update_config(Conf, State), []); apply(_Meta, {machine_version, 0, 1}, V0State) -> State = convert_v0_to_v1(V0State), {State, ok, []}. @@ -1011,7 +1046,7 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of {ok, State1, Effects1} -> State2 = append_to_master_index(RaftIdx, State1), - {State, ok, Effects} = checkout(Meta, State2, Effects1), + {State, ok, Effects} = checkout(Meta, State0, State2, Effects1), {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects}; {duplicate, State, Effects} -> {State, ok, Effects} @@ -1173,7 +1208,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned, _ -> {State1, Effects1} end, - {State, ok, Effects} = checkout(Meta, State2, Effects3), + {State, ok, Effects} = checkout(Meta, State0, State2, Effects3), update_smallest_raft_index(IncomingRaftIdx, State, Effects). % used to processes messages that are finished @@ -1221,7 +1256,7 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, Discarded = maps:with(MsgIds, Checked0), {State2, Effects1} = complete(ConsumerId, Discarded, Con0, Effects0, State0), - {State, ok, Effects} = checkout(Meta, State2, Effects1), + {State, ok, Effects} = checkout(Meta, State0, State2, Effects1), update_smallest_raft_index(IncomingRaftIdx, State, Effects). dead_letter_effects(_Reason, _Discarded, @@ -1257,7 +1292,10 @@ cancel_consumer_effects(ConsumerId, [{mod_call, rabbit_quorum_queue, cancel_consumer_handler, [QName, ConsumerId]} | Effects]. -update_smallest_raft_index(IncomingRaftIdx, +update_smallest_raft_index(Idx, State, Effects) -> + update_smallest_raft_index(Idx, ok, State, Effects). + +update_smallest_raft_index(IncomingRaftIdx, Reply, #?MODULE{ra_indexes = Indexes, release_cursors = Cursors0} = State0, Effects) -> @@ -1267,17 +1305,16 @@ update_smallest_raft_index(IncomingRaftIdx, % we can forward release_cursor all the way until % the last received command, hooray State = State0#?MODULE{release_cursors = lqueue:new()}, - {State, ok, Effects ++ [{release_cursor, IncomingRaftIdx, State}]}; + {State, Reply, Effects ++ [{release_cursor, IncomingRaftIdx, State}]}; _ -> Smallest = rabbit_fifo_index:smallest(Indexes), case find_next_cursor(Smallest, Cursors0) of {empty, Cursors} -> - {State0#?MODULE{release_cursors = Cursors}, - ok, Effects}; + {State0#?MODULE{release_cursors = Cursors}, Reply, Effects}; {Cursor, Cursors} -> %% we can emit a release cursor when we've passed the smallest %% release cursor available. - {State0#?MODULE{release_cursors = Cursors}, ok, + {State0#?MODULE{release_cursors = Cursors}, Reply, Effects ++ [Cursor]} end end. @@ -1382,10 +1419,10 @@ return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, end, {State, Effects0}, Checked). %% checkout new messages to consumers -checkout(#{index := Index}, State0, Effects0) -> +checkout(#{index := Index}, OldState, State0, Effects0) -> {State1, _Result, Effects1} = checkout0(checkout_one(State0), Effects0, {#{}, #{}}), - case evaluate_limit(false, State1, Effects1) of + case evaluate_limit(Index, false, OldState, State1, Effects1) of {State, true, Effects} -> update_smallest_raft_index(Index, State, Effects); {State, false, Effects} -> @@ -1418,16 +1455,54 @@ checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) -> end, {State0, ok, lists:reverse(Effects1)}. -evaluate_limit(Result, +evaluate_limit(_Index, Result, _BeforeState, #?MODULE{cfg = #cfg{max_length = undefined, max_bytes = undefined}} = State, Effects) -> {State, Result, Effects}; -evaluate_limit(Result, State0, Effects0) -> +evaluate_limit(Index, Result, BeforeState, + #?MODULE{cfg = #cfg{overflow_strategy = Strategy}, + enqueuers = Enqs0} = State0, + Effects0) -> case is_over_limit(State0) of - true -> + true when Strategy == drop_head -> {State, Effects} = drop_head(State0, Effects0), - evaluate_limit(true, State, Effects); + evaluate_limit(Index, true, BeforeState, State, Effects); + true when Strategy == reject_publish -> + %% generate send_msg effect for each enqueuer to let them know + %% they need to block + {Enqs, Effects} = + maps:fold( + fun (P, #enqueuer{blocked = undefined} = E0, {Enqs, Acc}) -> + E = E0#enqueuer{blocked = Index}, + {Enqs#{P => E}, + [{send_msg, P, {queue_status, reject_publish}, + [ra_event]} | Acc]}; + (_P, _E, Acc) -> + Acc + end, {Enqs0, Effects0}, Enqs0), + {State0#?MODULE{enqueuers = Enqs}, Result, Effects}; + false when Strategy == reject_publish -> + %% TODO: optimise as this case gets called for every command + %% pretty much + Before = is_below_soft_limit(BeforeState), + case {Before, is_below_soft_limit(State0)} of + {false, true} -> + %% we have moved below the lower limit which + {Enqs, Effects} = + maps:fold( + fun (P, #enqueuer{} = E0, {Enqs, Acc}) -> + E = E0#enqueuer{blocked = undefined}, + {Enqs#{P => E}, + [{send_msg, P, {queue_status, go}, [ra_event]} + | Acc]}; + (_P, _E, Acc) -> + Acc + end, {Enqs0, Effects0}, Enqs0), + {State0#?MODULE{enqueuers = Enqs}, Result, Effects}; + _ -> + {State0, Result, Effects0} + end; false -> {State0, Result, Effects0} end. @@ -1753,12 +1828,30 @@ is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined, is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength, max_bytes = MaxBytes}, msg_bytes_enqueue = BytesEnq} = State) -> - messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes). +is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = undefined, + max_bytes = undefined}}) -> + false; +is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = MaxLength, + max_bytes = MaxBytes}, + msg_bytes_enqueue = BytesEnq} = State) -> + is_below(MaxLength, messages_ready(State)) andalso + is_below(MaxBytes, BytesEnq). + +is_below(undefined, _Num) -> + true; +is_below(Val, Num) when is_integer(Val) andalso is_integer(Num) -> + Num =< trunc(Val * ?LOW_LIMIT). + -spec make_enqueue(option(pid()), option(msg_seqno()), raw_msg()) -> protocol(). make_enqueue(Pid, Seq, Msg) -> #enqueue{pid = Pid, seq = Seq, msg = Msg}. + +-spec make_register_enqueuer(pid()) -> protocol(). +make_register_enqueuer(Pid) -> + #register_enqueuer{pid = Pid}. + -spec make_checkout(consumer_id(), checkout_spec(), consumer_meta()) -> protocol(). make_checkout(ConsumerId, Spec, Meta) -> diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index e337540c35..dc01b6ec1c 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -75,6 +75,7 @@ -define(GC_MEM_LIMIT_B, 2000000). -define(MB, 1048576). +-define(LOW_LIMIT, 0.8). -record(consumer, {meta = #{} :: consumer_meta(), @@ -105,11 +106,11 @@ % out of order enqueues - sorted list pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}], status = up :: up | - suspected_down | - %% it is useful to have a record of when this was blocked - %% so that we can retry sending the block effect if - %% the publisher did not receive the initial one - {blocked, At :: ra:index()} + suspected_down, + %% it is useful to have a record of when this was blocked + %% so that we can retry sending the block effect if + %% the publisher did not receive the initial one + blocked :: undefined | ra:index() }). -record(cfg, @@ -191,5 +192,6 @@ max_bytes => non_neg_integer(), max_in_memory_length => non_neg_integer(), max_in_memory_bytes => non_neg_integer(), + overflow_strategy => drop_head | reject_publish, single_active_consumer_on => boolean(), delivery_limit => non_neg_integer()}. diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 2ac9f6787a..a698f93e9e 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -53,9 +53,17 @@ -record(consumer, {last_msg_id :: seq() | -1, delivery_count = 0 :: non_neg_integer()}). --record(state, {cluster_name :: cluster_name(), - servers = [] :: [ra:server_id()], +-record(cfg, {cluster_name :: cluster_name(), + servers = [] :: [ra:server_id()], + soft_limit = ?SOFT_LIMIT :: non_neg_integer(), + block_handler = fun() -> ok end :: fun(() -> term()), + unblock_handler = fun() -> ok end :: fun(() -> ok), + timeout :: non_neg_integer(), + version = 0 :: non_neg_integer()}). + +-record(state, {cfg :: #cfg{}, leader :: undefined | ra:server_id(), + queue_status :: undefined | go | reject_publish, next_seq = 0 :: seq(), %% Last applied is initialise to -1 to note that no command has yet been %% applied, but allowing to resend messages if the first ones on the sequence @@ -66,15 +74,11 @@ slow = false :: boolean(), unsent_commands = #{} :: #{rabbit_fifo:consumer_id() => {[seq()], [seq()], [seq()]}}, - soft_limit = ?SOFT_LIMIT :: non_neg_integer(), pending = #{} :: #{seq() => {term(), rabbit_fifo:command()}}, consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() => #consumer{}}, - block_handler = fun() -> ok end :: fun(() -> term()), - unblock_handler = fun() -> ok end :: fun(() -> ok), - timer_state :: term(), - timeout :: non_neg_integer() + timer_state :: term() }). -opaque state() :: #state{}. @@ -103,21 +107,22 @@ init(ClusterName, Servers) -> -spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state(). init(ClusterName = #resource{}, Servers, SoftLimit) -> Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, - #state{cluster_name = ClusterName, - servers = Servers, - soft_limit = SoftLimit, - timeout = Timeout}. + #state{cfg = #cfg{cluster_name = ClusterName, + servers = Servers, + soft_limit = SoftLimit, + timeout = Timeout}}. -spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok), fun(() -> ok)) -> state(). init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, - #state{cluster_name = ClusterName, - servers = Servers, - block_handler = BlockFun, - unblock_handler = UnblockFun, - soft_limit = SoftLimit, - timeout = Timeout}. + #state{cfg = #cfg{cluster_name = ClusterName, + servers = Servers, + block_handler = BlockFun, + unblock_handler = UnblockFun, + soft_limit = SoftLimit, + timeout = Timeout}}. + %% @doc Enqueues a message. %% @param Correlation an arbitrary erlang term used to correlate this @@ -132,10 +137,41 @@ init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> %% SequenceNumber can be correlated to the applied sequence numbers returned %% by the {@link handle_ra_event/2. handle_ra_event/2} function. -spec enqueue(Correlation :: term(), Msg :: term(), State :: state()) -> - {ok | slow, state()}. -enqueue(Correlation, Msg, State0 = #state{slow = Slow, - block_handler = BlockFun}) -> - Node = pick_node(State0), + {ok | slow | reject_publish, state()}. +enqueue(Correlation, Msg, + #state{queue_status = undefined, + next_enqueue_seq = 1, + cfg = #cfg{}} = State0) -> + %% it is the first enqueue, check the version + {_, Node} = Server = pick_server(State0), + State = + case rpc:call(Node, rabbit_fifo, version, []) of + 0 -> + %% the leader is running the old version + %% so we can't initialize the enqueuer session safely + State0#state{queue_status = go}; + 1 -> + %% were running the new version on the leader do sync initialisation + %% of enqueuer session + Reg = rabbit_fifo:make_register_enqueuer(self()), + case ra:process_command(Server, Reg) of + {ok, reject_publish, _} -> + State0#state{queue_status = reject_publish}; + {ok, ok, _} -> + State0#state{queue_status = go}; + Err -> + exit(Err) + end + end, + enqueue(Correlation, Msg, State); +enqueue(_Correlation, _Msg, + #state{queue_status = reject_publish, + cfg = #cfg{}} = State) -> + {reject_publish, State}; +enqueue(Correlation, Msg, + #state{slow = Slow, + cfg = #cfg{block_handler = BlockFun}} = State0) -> + Node = pick_server(State0), {Next, State1} = next_enqueue_seq(State0), % by default there is no correlation id Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg), @@ -159,7 +195,7 @@ enqueue(Correlation, Msg, State0 = #state{slow = Slow, %% by the {@link handle_ra_event/2. handle_ra_event/2} function. %% -spec enqueue(Msg :: term(), State :: state()) -> - {ok | slow, state()}. + {ok | slow | reject_publish, state()}. enqueue(Msg, State) -> enqueue(undefined, Msg, State). @@ -178,8 +214,9 @@ enqueue(Msg, State) -> Settlement :: settled | unsettled, state()) -> {ok, {rabbit_fifo:delivery_msg(), non_neg_integer()} | empty, state()} | {error | timeout, term()}. -dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) -> - Node = pick_node(State0), +dequeue(ConsumerTag, Settlement, + #state{cfg = #cfg{timeout = Timeout}} = State0) -> + Node = pick_server(State0), ConsumerId = consumer_id(ConsumerTag), case ra:process_command(Node, rabbit_fifo:make_checkout(ConsumerId, @@ -189,7 +226,8 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) -> {ok, {dequeue, empty}, Leader} -> {ok, empty, State0#state{leader = Leader}}; {ok, {dequeue, Msg, NumReady}, Leader} -> - {ok, {Msg, NumReady}, State0#state{leader = Leader}}; + {ok, {Msg, NumReady}, + State0#state{leader = Leader}}; {ok, {error, _} = Err, _Leader} -> Err; Err -> @@ -208,7 +246,7 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) -> -spec settle(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> {ok, state()}. settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> - Node = pick_node(State0), + Node = pick_server(State0), Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds), case send_command(Node, undefined, Cmd, normal, State0) of {slow, S} -> @@ -241,7 +279,7 @@ settle(ConsumerTag, [_|_] = MsgIds, -spec return(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> {ok, state()}. return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> - Node = pick_node(State0), + Node = pick_server(State0), % TODO: make rabbit_fifo return support lists of message ids Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds), case send_command(Node, undefined, Cmd, normal, State0) of @@ -275,7 +313,7 @@ return(ConsumerTag, [_|_] = MsgIds, -spec discard(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> {ok | slow, state()}. discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> - Node = pick_node(State0), + Node = pick_server(State0), Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds), case send_command(Node, undefined, Cmd, normal, State0) of {slow, S} -> @@ -363,7 +401,7 @@ credit(ConsumerTag, Credit, Drain, %% the last received msgid provides us with the delivery count if we %% add one as it is 0 indexed C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}), - Node = pick_node(State0), + Node = pick_server(State0), Cmd = rabbit_fifo:make_credit(ConsumerId, Credit, C#consumer.last_msg_id + 1, Drain), case send_command(Node, undefined, Cmd, normal, State0) of @@ -429,11 +467,11 @@ stat(Leader, Timeout) -> %% @doc returns the cluster name -spec cluster_name(state()) -> cluster_name(). -cluster_name(#state{cluster_name = ClusterName}) -> +cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) -> ClusterName. -update_machine_state(Node, Conf) -> - case ra:process_command(Node, rabbit_fifo:make_update_config(Conf)) of +update_machine_state(Server, Conf) -> + case ra:process_command(Server, rabbit_fifo:make_update_config(Conf)) of {ok, ok, _} -> ok; Err -> @@ -487,10 +525,11 @@ update_machine_state(Node, Conf) -> {internal, Correlators :: [term()], actions(), state()} | {rabbit_fifo:client_msg(), state()} | eol. handle_ra_event(From, {applied, Seqs}, - #state{soft_limit = SftLmt, - unblock_handler = UnblockFun} = State0) -> + #state{cfg = #cfg{soft_limit = SftLmt, + unblock_handler = UnblockFun}} = State00) -> + State0 = State00#state{leader = From}, {Corrs, Actions, State1} = lists:foldl(fun seq_applied/2, - {[], [], State0#state{leader = From}}, + {[], [], State0}, Seqs), case maps:size(State1#state.pending) < SftLmt of true when State1#state.slow == true -> @@ -511,7 +550,7 @@ handle_ra_event(From, {applied, Seqs}, add_command(Cid, discard, Discards, Acc))) end, [], State1#state.unsent_commands), - Node = pick_node(State2), + Node = pick_server(State2), %% send all the settlements and returns State = lists:foldl(fun (C, S0) -> case send_command(Node, undefined, @@ -527,6 +566,10 @@ handle_ra_event(From, {applied, Seqs}, end; handle_ra_event(From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) -> handle_delivery(From, Del, State0); +handle_ra_event(_, {machine, {queue_status, Status}}, + #state{} = State) -> + %% just set the queue status + {internal, [], [], State#state{queue_status = Status}}; handle_ra_event(Leader, {machine, leader_change}, #state{leader = Leader} = State) -> %% leader already known @@ -543,7 +586,7 @@ handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) -> State1 = State0#state{leader = Leader}, State = resend(Seq, State1), {internal, [], [], State}; -handle_ra_event(_, timeout, #state{servers = Servers} = State0) -> +handle_ra_event(_, timeout, #state{cfg = #cfg{servers = Servers}} = State0) -> case find_leader(Servers) of undefined -> %% still no leader, set the timer again @@ -642,7 +685,6 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) -> error -> State end. - resend_all_pending(#state{pending = Pend} = State) -> Seqs = lists:sort(maps:keys(Pend)), lists:foldl(fun resend/2, State, Seqs). @@ -719,16 +761,19 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) -> [Leader]) end. -pick_node(#state{leader = undefined, servers = [N | _]}) -> +pick_server(#state{leader = undefined, + cfg = #cfg{servers = [N | _]}}) -> %% TODO: pick random rather that first? N; -pick_node(#state{leader = Leader}) -> +pick_server(#state{leader = Leader}) -> Leader. % servers sorted by last known leader -sorted_servers(#state{leader = undefined, servers = Servers}) -> +sorted_servers(#state{leader = undefined, + cfg = #cfg{servers = Servers}}) -> Servers; -sorted_servers(#state{leader = Leader, servers = Servers}) -> +sorted_servers(#state{leader = Leader, + cfg = #cfg{servers = Servers}}) -> [Leader | lists:delete(Leader, Servers)]. next_seq(#state{next_seq = Seq} = State) -> @@ -742,7 +787,7 @@ consumer_id(ConsumerTag) -> send_command(Server, Correlation, Command, Priority, #state{pending = Pending, - soft_limit = SftLmt} = State0) -> + cfg = #cfg{soft_limit = SftLmt}} = State0) -> {Seq, State} = next_seq(State0), ok = ra:pipeline_command(Server, Command, Seq, Priority), Tag = case maps:size(Pending) >= SftLmt of @@ -767,7 +812,7 @@ add_command(Cid, return, MsgIds, Acc) -> add_command(Cid, discard, MsgIds, Acc) -> [rabbit_fifo:make_discard(Cid, MsgIds) | Acc]. -set_timer(#state{servers = [Server | _]} = State) -> +set_timer(#state{cfg = #cfg{servers = [Server | _]}} = State) -> Ref = erlang:send_after(?TIMER_TIME, self(), {ra_event, Server, timeout}), State#state{timer_state = Ref}. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 915cf9d527..22333f6bdf 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -125,6 +125,12 @@ declare(Q) when ?amqqueue_is_quorum(Q) -> || ServerId <- members(NewQ)], case ra:start_cluster(RaConfs) of {ok, _, _} -> + %% TODO: handle error - what should be done if the + %% config cannot be updated + ok = rabbit_fifo_client:update_machine_state(Id, + ra_machine_config(NewQ)), + %% force a policy change to ensure the latest config is + %% updated even when running the machine version from 0 rabbit_event:notify(queue_created, [{name, QName}, {durable, Durable}, @@ -152,6 +158,12 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> {Name, _} = amqqueue:get_pid(Q), %% take the minimum value of the policy and the queue arg if present MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q), + Overflow = args_policy_lookup(<<"overflow">>, + fun (A, B) -> + rabbit_log:info("RESOLVE ~p", [{A, B}]), + A + end , Q), + rabbit_log:info("OVERFLOW ~p ARGS ~p", [Overflow, amqqueue:get_arguments(Q)]), MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q), MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q), @@ -165,7 +177,8 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> max_in_memory_length => MaxMemoryLength, max_in_memory_bytes => MaxMemoryBytes, single_active_consumer_on => single_active_consumer_on(Q), - delivery_limit => DeliveryLimit + delivery_limit => DeliveryLimit, + overflow_strategy => overflow(Overflow, drop_head) }. single_active_consumer_on(Q) -> @@ -292,8 +305,13 @@ filter_quorum_critical(Queues, ReplicaStates) -> -spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean(). is_policy_applicable(_Q, Policy) -> - Applicable = [<<"max-length">>, <<"max-length-bytes">>, <<"max-in-memory-length">>, - <<"max-in-memory-bytes">>, <<"delivery-limit">>, <<"dead-letter-exchange">>, + Applicable = [<<"max-length">>, + <<"max-length-bytes">>, + <<"x-overflow">>, + <<"max-in-memory-length">>, + <<"max-in-memory-bytes">>, + <<"delivery-limit">>, + <<"dead-letter-exchange">>, <<"dead-letter-routing-key">>], lists:all(fun({P, _}) -> lists:member(P, Applicable) @@ -694,13 +712,29 @@ stateless_deliver(ServerId, Delivery) -> -spec deliver(Confirm :: boolean(), rabbit_types:delivery(), rabbit_fifo_client:state()) -> - {ok | slow, rabbit_fifo_client:state()}. + {ok | slow, rabbit_fifo_client:state()} | + {reject_publish, rabbit_fifo_client:state()}. deliver(false, Delivery, QState0) -> - rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0); + case rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0) of + {ok, _} = Res -> Res; + {slow, _} = Res -> Res; + {reject_publish, State} -> + {ok, State} + end; deliver(true, Delivery, QState0) -> - rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no, - Delivery#delivery.message, QState0). + Seq = Delivery#delivery.msg_seq_no, + case rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no, + Delivery#delivery.message, QState0) of + {ok, _} = Res -> Res; + {slow, _} = Res -> Res; + {reject_publish, State} -> + %% TODO: this works fine but once the queue types interface is in + %% place it could be replaced with an action or similar to avoid + %% self publishing messages. + gen_server2:cast(self(), {reject_publish, Seq, undefined}), + {ok, State} + end. -spec info(amqqueue:amqqueue()) -> rabbit_types:infos(). @@ -783,9 +817,9 @@ maybe_delete_data_dir(UId) -> ok end. -policy_changed(QName, Node) -> +policy_changed(QName, Server) -> {ok, Q} = rabbit_amqqueue:lookup(QName), - rabbit_fifo_client:update_machine_state(Node, ra_machine_config(Q)). + rabbit_fifo_client:update_machine_state(Server, ra_machine_config(Q)). -spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'. @@ -1321,7 +1355,7 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). check_invalid_arguments(QueueName, Args) -> Keys = [<<"x-expires">>, <<"x-message-ttl">>, - <<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>], + <<"x-max-priority">>, <<"x-queue-mode">>], [case rabbit_misc:table_lookup(Args, Key) of undefined -> ok; _TypeVal -> rabbit_misc:protocol_error( @@ -1413,3 +1447,7 @@ get_nodes(Q) when ?is_amqqueue(Q) -> update_type_state(Q, Fun) when ?is_amqqueue(Q) -> Ts = amqqueue:get_type_state(Q), amqqueue:set_type_state(Q, Fun(Ts)). + +overflow(undefined, Def) -> Def; +overflow(<<"reject-publish">>, _Def) -> reject_publish; +overflow(<<"drop-head">>, _Def) -> drop_head. |
