diff options
Diffstat (limited to 'deps/rabbit/src/rabbit_fifo_client.erl')
-rw-r--r-- | deps/rabbit/src/rabbit_fifo_client.erl | 888 |
1 files changed, 888 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl new file mode 100644 index 0000000000..3990222b15 --- /dev/null +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -0,0 +1,888 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +%% @doc Provides an easy to consume API for interacting with the {@link rabbit_fifo.} +%% state machine implementation running inside a `ra' raft system. +%% +%% Handles command tracking and other non-functional concerns. +-module(rabbit_fifo_client). + +-export([ + init/2, + init/3, + init/5, + checkout/5, + cancel_checkout/2, + enqueue/2, + enqueue/3, + dequeue/3, + settle/3, + return/3, + discard/3, + credit/4, + handle_ra_event/3, + untracked_enqueue/2, + purge/1, + cluster_name/1, + update_machine_state/2, + pending_size/1, + stat/1, + stat/2 + ]). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-define(SOFT_LIMIT, 32). +-define(TIMER_TIME, 10000). + +-type seq() :: non_neg_integer(). +%% last_applied is initialised to -1 +-type maybe_seq() :: integer(). +-type action() :: {send_credit_reply, Available :: non_neg_integer()} | + {send_drained, CTagCredit :: + {rabbit_fifo:consumer_tag(), non_neg_integer()}}. +-type actions() :: [action()]. + +-type cluster_name() :: rabbit_types:r(queue). + +-record(consumer, {last_msg_id :: seq() | -1, + ack = false :: boolean(), + delivery_count = 0 :: non_neg_integer()}). + +-record(cfg, {cluster_name :: cluster_name(), + servers = [] :: [ra:server_id()], + soft_limit = ?SOFT_LIMIT :: non_neg_integer(), + block_handler = fun() -> ok end :: fun(() -> term()), + unblock_handler = fun() -> ok end :: fun(() -> ok), + timeout :: non_neg_integer(), + version = 0 :: non_neg_integer()}). + +-record(state, {cfg :: #cfg{}, + leader :: undefined | ra:server_id(), + queue_status :: undefined | go | reject_publish, + next_seq = 0 :: seq(), + %% Last applied is initialise to -1 to note that no command has yet been + %% applied, but allowing to resend messages if the first ones on the sequence + %% are lost (messages are sent from last_applied + 1) + last_applied = -1 :: maybe_seq(), + next_enqueue_seq = 1 :: seq(), + %% indicates that we've exceeded the soft limit + slow = false :: boolean(), + unsent_commands = #{} :: #{rabbit_fifo:consumer_id() => + {[seq()], [seq()], [seq()]}}, + pending = #{} :: #{seq() => + {term(), rabbit_fifo:command()}}, + consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() => + #consumer{}}, + timer_state :: term() + }). + +-opaque state() :: #state{}. + +-export_type([ + state/0, + actions/0 + ]). + + +%% @doc Create the initial state for a new rabbit_fifo sessions. A state is needed +%% to interact with a rabbit_fifo queue using @module. +%% @param ClusterName the id of the cluster to interact with +%% @param Servers The known servers of the queue. If the current leader is known +%% ensure the leader node is at the head of the list. +-spec init(cluster_name(), [ra:server_id()]) -> state(). +init(ClusterName, Servers) -> + init(ClusterName, Servers, ?SOFT_LIMIT). + +%% @doc Create the initial state for a new rabbit_fifo sessions. A state is needed +%% to interact with a rabbit_fifo queue using @module. +%% @param ClusterName the id of the cluster to interact with +%% @param Servers The known servers of the queue. If the current leader is known +%% ensure the leader node is at the head of the list. +%% @param MaxPending size defining the max number of pending commands. +-spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state(). +init(ClusterName = #resource{}, Servers, SoftLimit) -> + Timeout = application:get_env(kernel, net_ticktime, 60) + 5, + #state{cfg = #cfg{cluster_name = ClusterName, + servers = Servers, + soft_limit = SoftLimit, + timeout = Timeout * 1000}}. + +-spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok), + fun(() -> ok)) -> state(). +init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> + %% net ticktime is in seconds + Timeout = application:get_env(kernel, net_ticktime, 60) + 5, + #state{cfg = #cfg{cluster_name = ClusterName, + servers = Servers, + block_handler = BlockFun, + unblock_handler = UnblockFun, + soft_limit = SoftLimit, + timeout = Timeout * 1000}}. + + +%% @doc Enqueues a message. +%% @param Correlation an arbitrary erlang term used to correlate this +%% command when it has been applied. +%% @param Msg an arbitrary erlang term representing the message. +%% @param State the current {@module} state. +%% @returns +%% `{ok | slow, State}' if the command was successfully sent. If the return +%% tag is `slow' it means the limit is approaching and it is time to slow down +%% the sending rate. +%% {@module} assigns a sequence number to every raft command it issues. The +%% SequenceNumber can be correlated to the applied sequence numbers returned +%% by the {@link handle_ra_event/2. handle_ra_event/2} function. +-spec enqueue(Correlation :: term(), Msg :: term(), State :: state()) -> + {ok | slow | reject_publish, state()}. +enqueue(Correlation, Msg, + #state{queue_status = undefined, + next_enqueue_seq = 1, + cfg = #cfg{timeout = Timeout}} = State0) -> + %% it is the first enqueue, check the version + {_, Node} = Server = pick_server(State0), + case rpc:call(Node, ra_machine, version, [{machine, rabbit_fifo, #{}}]) of + 0 -> + %% the leader is running the old version + %% so we can't initialize the enqueuer session safely + %% fall back on old behavour + enqueue(Correlation, Msg, State0#state{queue_status = go}); + 1 -> + %% were running the new version on the leader do sync initialisation + %% of enqueuer session + Reg = rabbit_fifo:make_register_enqueuer(self()), + case ra:process_command(Server, Reg, Timeout) of + {ok, reject_publish, _} -> + {reject_publish, State0#state{queue_status = reject_publish}}; + {ok, ok, _} -> + enqueue(Correlation, Msg, State0#state{queue_status = go}); + {timeout, _} -> + %% if we timeout it is probably better to reject + %% the message than being uncertain + {reject_publish, State0}; + Err -> + exit(Err) + end; + {badrpc, nodedown} -> + {reject_publish, State0} + end; +enqueue(_Correlation, _Msg, + #state{queue_status = reject_publish, + cfg = #cfg{}} = State) -> + {reject_publish, State}; +enqueue(Correlation, Msg, + #state{slow = Slow, + queue_status = go, + cfg = #cfg{block_handler = BlockFun}} = State0) -> + Node = pick_server(State0), + {Next, State1} = next_enqueue_seq(State0), + % by default there is no correlation id + Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg), + case send_command(Node, Correlation, Cmd, low, State1) of + {slow, State} when not Slow -> + BlockFun(), + {slow, set_timer(State)}; + Any -> + Any + end. + +%% @doc Enqueues a message. +%% @param Msg an arbitrary erlang term representing the message. +%% @param State the current {@module} state. +%% @returns +%% `{ok | slow, State}' if the command was successfully sent. If the return +%% tag is `slow' it means the limit is approaching and it is time to slow down +%% the sending rate. +%% {@module} assigns a sequence number to every raft command it issues. The +%% SequenceNumber can be correlated to the applied sequence numbers returned +%% by the {@link handle_ra_event/2. handle_ra_event/2} function. +%% +-spec enqueue(Msg :: term(), State :: state()) -> + {ok | slow | reject_publish, state()}. +enqueue(Msg, State) -> + enqueue(undefined, Msg, State). + +%% @doc Dequeue a message from the queue. +%% +%% This is a synchronous call. I.e. the call will block until the command +%% has been accepted by the ra process or it times out. +%% +%% @param ConsumerTag a unique tag to identify this particular consumer. +%% @param Settlement either `settled' or `unsettled'. When `settled' no +%% further settlement needs to be done. +%% @param State The {@module} state. +%% +%% @returns `{ok, IdMsg, State}' or `{error | timeout, term()}' +-spec dequeue(rabbit_fifo:consumer_tag(), + Settlement :: settled | unsettled, state()) -> + {ok, non_neg_integer(), term(), non_neg_integer()} + | {empty, state()} | {error | timeout, term()}. +dequeue(ConsumerTag, Settlement, + #state{cfg = #cfg{timeout = Timeout, + cluster_name = QName}} = State0) -> + Node = pick_server(State0), + ConsumerId = consumer_id(ConsumerTag), + case ra:process_command(Node, + rabbit_fifo:make_checkout(ConsumerId, + {dequeue, Settlement}, + #{}), + Timeout) of + {ok, {dequeue, empty}, Leader} -> + {empty, State0#state{leader = Leader}}; + {ok, {dequeue, {MsgId, {MsgHeader, Msg0}}, MsgsReady}, Leader} -> + Count = case MsgHeader of + #{delivery_count := C} -> C; + _ -> 0 + end, + IsDelivered = Count > 0, + Msg = add_delivery_count_header(Msg0, Count), + {ok, MsgsReady, + {QName, qref(Leader), MsgId, IsDelivered, Msg}, + State0#state{leader = Leader}}; + {ok, {error, _} = Err, _Leader} -> + Err; + Err -> + Err + end. + +add_delivery_count_header(#basic_message{} = Msg0, Count) + when is_integer(Count) -> + rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0); +add_delivery_count_header(Msg, _Count) -> + Msg. + + +%% @doc Settle a message. Permanently removes message from the queue. +%% @param ConsumerTag the tag uniquely identifying the consumer. +%% @param MsgIds the message ids received with the {@link rabbit_fifo:delivery/0.} +%% @param State the {@module} state +%% @returns +%% `{ok | slow, State}' if the command was successfully sent. If the return +%% tag is `slow' it means the limit is approaching and it is time to slow down +%% the sending rate. +%% +-spec settle(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> + {state(), list()}. +settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> + Node = pick_server(State0), + Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds), + case send_command(Node, undefined, Cmd, normal, State0) of + {_, S} -> + % turn slow into ok for this function + {S, []} + end; +settle(ConsumerTag, [_|_] = MsgIds, + #state{unsent_commands = Unsent0} = State0) -> + ConsumerId = consumer_id(ConsumerTag), + %% we've reached the soft limit so will stash the command to be + %% sent once we have seen enough notifications + Unsent = maps:update_with(ConsumerId, + fun ({Settles, Returns, Discards}) -> + {Settles ++ MsgIds, Returns, Discards} + end, {MsgIds, [], []}, Unsent0), + {State0#state{unsent_commands = Unsent}, []}. + +%% @doc Return a message to the queue. +%% @param ConsumerTag the tag uniquely identifying the consumer. +%% @param MsgIds the message ids to return received +%% from {@link rabbit_fifo:delivery/0.} +%% @param State the {@module} state +%% @returns +%% `{ok | slow, State}' if the command was successfully sent. If the return +%% tag is `slow' it means the limit is approaching and it is time to slow down +%% the sending rate. +%% +-spec return(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> + {state(), list()}. +return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> + Node = pick_server(State0), + % TODO: make rabbit_fifo return support lists of message ids + Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds), + case send_command(Node, undefined, Cmd, normal, State0) of + {_, S} -> + {S, []} + end; +return(ConsumerTag, [_|_] = MsgIds, + #state{unsent_commands = Unsent0} = State0) -> + ConsumerId = consumer_id(ConsumerTag), + %% we've reached the soft limit so will stash the command to be + %% sent once we have seen enough notifications + Unsent = maps:update_with(ConsumerId, + fun ({Settles, Returns, Discards}) -> + {Settles, Returns ++ MsgIds, Discards} + end, {[], MsgIds, []}, Unsent0), + {State0#state{unsent_commands = Unsent}, []}. + +%% @doc Discards a checked out message. +%% If the queue has a dead_letter_handler configured this will be called. +%% @param ConsumerTag the tag uniquely identifying the consumer. +%% @param MsgIds the message ids to discard +%% from {@link rabbit_fifo:delivery/0.} +%% @param State the {@module} state +%% @returns +%% `{ok | slow, State}' if the command was successfully sent. If the return +%% tag is `slow' it means the limit is approaching and it is time to slow down +%% the sending rate. +-spec discard(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> + {state(), list()}. +discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> + Node = pick_server(State0), + Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds), + case send_command(Node, undefined, Cmd, normal, State0) of + {_, S} -> + % turn slow into ok for this function + {S, []} + end; +discard(ConsumerTag, [_|_] = MsgIds, + #state{unsent_commands = Unsent0} = State0) -> + ConsumerId = consumer_id(ConsumerTag), + %% we've reached the soft limit so will stash the command to be + %% sent once we have seen enough notifications + Unsent = maps:update_with(ConsumerId, + fun ({Settles, Returns, Discards}) -> + {Settles, Returns, Discards ++ MsgIds} + end, {[], [], MsgIds}, Unsent0), + {State0#state{unsent_commands = Unsent}, []}. + +%% @doc Register with the rabbit_fifo queue to "checkout" messages as they +%% become available. +%% +%% This is a synchronous call. I.e. the call will block until the command +%% has been accepted by the ra process or it times out. +%% +%% @param ConsumerTag a unique tag to identify this particular consumer. +%% @param NumUnsettled the maximum number of in-flight messages. Once this +%% number of messages has been received but not settled no further messages +%% will be delivered to the consumer. +%% @param CreditMode The credit mode to use for the checkout. +%% simple_prefetch: credit is auto topped up as deliveries are settled +%% credited: credit is only increased by sending credit to the queue +%% @param State The {@module} state. +%% +%% @returns `{ok, State}' or `{error | timeout, term()}' +-spec checkout(rabbit_fifo:consumer_tag(), + NumUnsettled :: non_neg_integer(), + CreditMode :: rabbit_fifo:credit_mode(), + Meta :: rabbit_fifo:consumer_meta(), + state()) -> {ok, state()} | {error | timeout, term()}. +checkout(ConsumerTag, NumUnsettled, CreditMode, Meta, + #state{consumer_deliveries = CDels0} = State0) -> + Servers = sorted_servers(State0), + ConsumerId = {ConsumerTag, self()}, + Cmd = rabbit_fifo:make_checkout(ConsumerId, + {auto, NumUnsettled, CreditMode}, + Meta), + %% ??? + Ack = maps:get(ack, Meta, true), + + SDels = maps:update_with(ConsumerTag, + fun (V) -> + V#consumer{ack = Ack} + end, + #consumer{last_msg_id = -1, + ack = Ack}, CDels0), + try_process_command(Servers, Cmd, State0#state{consumer_deliveries = SDels}). + +%% @doc Provide credit to the queue +%% +%% This only has an effect if the consumer uses credit mode: credited +%% @param ConsumerTag a unique tag to identify this particular consumer. +%% @param Credit the amount of credit to provide to theq queue +%% @param Drain tells the queue to use up any credit that cannot be immediately +%% fulfilled. (i.e. there are not enough messages on queue to use up all the +%% provided credit). +-spec credit(rabbit_fifo:consumer_tag(), + Credit :: non_neg_integer(), + Drain :: boolean(), + state()) -> + {state(), actions()}. +credit(ConsumerTag, Credit, Drain, + #state{consumer_deliveries = CDels} = State0) -> + ConsumerId = consumer_id(ConsumerTag), + %% the last received msgid provides us with the delivery count if we + %% add one as it is 0 indexed + C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}), + Node = pick_server(State0), + Cmd = rabbit_fifo:make_credit(ConsumerId, Credit, + C#consumer.last_msg_id + 1, Drain), + case send_command(Node, undefined, Cmd, normal, State0) of + {_, S} -> + % turn slow into ok for this function + {S, []} + end. + +%% @doc Cancels a checkout with the rabbit_fifo queue for the consumer tag +%% +%% This is a synchronous call. I.e. the call will block until the command +%% has been accepted by the ra process or it times out. +%% +%% @param ConsumerTag a unique tag to identify this particular consumer. +%% @param State The {@module} state. +%% +%% @returns `{ok, State}' or `{error | timeout, term()}' +-spec cancel_checkout(rabbit_fifo:consumer_tag(), state()) -> + {ok, state()} | {error | timeout, term()}. +cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) -> + Servers = sorted_servers(State0), + ConsumerId = {ConsumerTag, self()}, + Cmd = rabbit_fifo:make_checkout(ConsumerId, cancel, #{}), + State = State0#state{consumer_deliveries = maps:remove(ConsumerTag, CDels)}, + try_process_command(Servers, Cmd, State). + +%% @doc Purges all the messages from a rabbit_fifo queue and returns the number +%% of messages purged. +-spec purge(ra:server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}. +purge(Node) -> + case ra:process_command(Node, rabbit_fifo:make_purge()) of + {ok, {purge, Reply}, _} -> + {ok, Reply}; + Err -> + Err + end. + +-spec pending_size(state()) -> non_neg_integer(). +pending_size(#state{pending = Pend}) -> + maps:size(Pend). + +-spec stat(ra:server_id()) -> + {ok, non_neg_integer(), non_neg_integer()} + | {error | timeout, term()}. +stat(Leader) -> + %% short timeout as we don't want to spend too long if it is going to + %% fail anyway + stat(Leader, 250). + +-spec stat(ra:server_id(), non_neg_integer()) -> + {ok, non_neg_integer(), non_neg_integer()} + | {error | timeout, term()}. +stat(Leader, Timeout) -> + %% short timeout as we don't want to spend too long if it is going to + %% fail anyway + case ra:local_query(Leader, fun rabbit_fifo:query_stat/1, Timeout) of + {ok, {_, {R, C}}, _} -> {ok, R, C}; + {error, _} = Error -> Error; + {timeout, _} = Error -> Error + end. + +%% @doc returns the cluster name +-spec cluster_name(state()) -> cluster_name(). +cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) -> + ClusterName. + +update_machine_state(Server, Conf) -> + case ra:process_command(Server, rabbit_fifo:make_update_config(Conf)) of + {ok, ok, _} -> + ok; + Err -> + Err + end. + +%% @doc Handles incoming `ra_events'. Events carry both internal "bookeeping" +%% events emitted by the `ra' leader as well as `rabbit_fifo' emitted events such +%% as message deliveries. All ra events need to be handled by {@module} +%% to ensure bookeeping, resends and flow control is correctly handled. +%% +%% If the `ra_event' contains a `rabbit_fifo' generated message it will be returned +%% for further processing. +%% +%% Example: +%% +%% ``` +%% receive +%% {ra_event, From, Evt} -> +%% case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of +%% {internal, _Seq, State} -> State; +%% {{delivery, _ConsumerTag, Msgs}, State} -> +%% handle_messages(Msgs), +%% ... +%% end +%% end +%% ''' +%% +%% @param From the {@link ra:server_id().} of the sending process. +%% @param Event the body of the `ra_event'. +%% @param State the current {@module} state. +%% +%% @returns +%% `{internal, AppliedCorrelations, State}' if the event contained an internally +%% handled event such as a notification and a correlation was included with +%% the command (e.g. in a call to `enqueue/3' the correlation terms are returned +%% here. +%% +%% `{RaFifoEvent, State}' if the event contained a client message generated by +%% the `rabbit_fifo' state machine such as a delivery. +%% +%% The type of `rabbit_fifo' client messages that can be received are: +%% +%% `{delivery, ConsumerTag, [{MsgId, {MsgHeader, Msg}}]}' +%% +%% <li>`ConsumerTag' the binary tag passed to {@link checkout/3.}</li> +%% <li>`MsgId' is a consumer scoped monotonically incrementing id that can be +%% used to {@link settle/3.} (roughly: AMQP 0.9.1 ack) message once finished +%% with them.</li> +-spec handle_ra_event(ra:server_id(), ra_server_proc:ra_event_body(), state()) -> + {internal, Correlators :: [term()], actions(), state()} | + {rabbit_fifo:client_msg(), state()} | eol. +handle_ra_event(From, {applied, Seqs}, + #state{cfg = #cfg{cluster_name = QRef, + soft_limit = SftLmt, + unblock_handler = UnblockFun}} = State0) -> + + {Corrs, Actions0, State1} = lists:foldl(fun seq_applied/2, + {[], [], State0#state{leader = From}}, + Seqs), + Actions = case Corrs of + [] -> + lists:reverse(Actions0); + _ -> + [{settled, QRef, Corrs} + | lists:reverse(Actions0)] + end, + case maps:size(State1#state.pending) < SftLmt of + true when State1#state.slow == true -> + % we have exited soft limit state + % send any unsent commands and cancel the time as + % TODO: really the timer should only be cancelled when the channel + % exits flow state (which depends on the state of all queues the + % channel is interacting with) + % but the fact the queue has just applied suggests + % it's ok to cancel here anyway + State2 = cancel_timer(State1#state{slow = false, + unsent_commands = #{}}), + % build up a list of commands to issue + Commands = maps:fold( + fun (Cid, {Settled, Returns, Discards}, Acc) -> + add_command(Cid, settle, Settled, + add_command(Cid, return, Returns, + add_command(Cid, discard, + Discards, Acc))) + end, [], State1#state.unsent_commands), + Node = pick_server(State2), + %% send all the settlements and returns + State = lists:foldl(fun (C, S0) -> + case send_command(Node, undefined, + C, normal, S0) of + {T, S} when T =/= error -> + S + end + end, State2, Commands), + UnblockFun(), + {ok, State, Actions}; + _ -> + {ok, State1, Actions} + end; +handle_ra_event(From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) -> + handle_delivery(From, Del, State0); +handle_ra_event(_, {machine, {queue_status, Status}}, + #state{} = State) -> + %% just set the queue status + {ok, State#state{queue_status = Status}, []}; +handle_ra_event(Leader, {machine, leader_change}, + #state{leader = Leader} = State) -> + %% leader already known + {ok, State, []}; +handle_ra_event(Leader, {machine, leader_change}, State0) -> + %% we need to update leader + %% and resend any pending commands + State = resend_all_pending(State0#state{leader = Leader}), + {ok, State, []}; +handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) -> + % TODO: how should these be handled? re-sent on timer or try random + {ok, State0, []}; +handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) -> + State1 = State0#state{leader = Leader}, + State = resend(Seq, State1), + {ok, State, []}; +handle_ra_event(_, timeout, #state{cfg = #cfg{servers = Servers}} = State0) -> + case find_leader(Servers) of + undefined -> + %% still no leader, set the timer again + {ok, set_timer(State0), []}; + Leader -> + State = resend_all_pending(State0#state{leader = Leader}), + {ok, State, []} + end; +handle_ra_event(_Leader, {machine, eol}, _State0) -> + eol. + +%% @doc Attempts to enqueue a message using cast semantics. This provides no +%% guarantees or retries if the message fails to achieve consensus or if the +%% servers sent to happens not to be available. If the message is sent to a +%% follower it will attempt the deliver it to the leader, if known. Else it will +%% drop the messages. +%% +%% NB: only use this for non-critical enqueues where a full rabbit_fifo_client state +%% cannot be maintained. +%% +%% @param CusterId the cluster id. +%% @param Servers the known servers in the cluster. +%% @param Msg the message to enqueue. +%% +%% @returns `ok' +-spec untracked_enqueue([ra:server_id()], term()) -> + ok. +untracked_enqueue([Node | _], Msg) -> + Cmd = rabbit_fifo:make_enqueue(undefined, undefined, Msg), + ok = ra:pipeline_command(Node, Cmd), + ok. + +%% Internal + +try_process_command([Server | Rem], Cmd, State) -> + case ra:process_command(Server, Cmd, 30000) of + {ok, _, Leader} -> + {ok, State#state{leader = Leader}}; + Err when length(Rem) =:= 0 -> + Err; + _ -> + try_process_command(Rem, Cmd, State) + end. + +seq_applied({Seq, MaybeAction}, + {Corrs, Actions0, #state{last_applied = Last} = State0}) + when Seq > Last -> + State1 = do_resends(Last+1, Seq-1, State0), + {Actions, State} = maybe_add_action(MaybeAction, Actions0, State1), + case maps:take(Seq, State#state.pending) of + {{undefined, _}, Pending} -> + {Corrs, Actions, State#state{pending = Pending, + last_applied = Seq}}; + {{Corr, _}, Pending} -> + {[Corr | Corrs], Actions, State#state{pending = Pending, + last_applied = Seq}}; + error -> + % must have already been resent or removed for some other reason + % still need to update last_applied or we may inadvertently resend + % stuff later + {Corrs, Actions, State#state{last_applied = Seq}} + end; +seq_applied(_Seq, Acc) -> + Acc. + +maybe_add_action(ok, Acc, State) -> + {Acc, State}; +maybe_add_action({multi, Actions}, Acc0, State0) -> + lists:foldl(fun (Act, {Acc, State}) -> + maybe_add_action(Act, Acc, State) + end, {Acc0, State0}, Actions); +maybe_add_action({send_drained, {Tag, Credit}} = Action, Acc, + #state{consumer_deliveries = CDels} = State) -> + %% add credit to consumer delivery_count + C = maps:get(Tag, CDels), + {[Action | Acc], + State#state{consumer_deliveries = + update_consumer(Tag, C#consumer.last_msg_id, + Credit, C, CDels)}}; +maybe_add_action(Action, Acc, State) -> + %% anything else is assumed to be an action + {[Action | Acc], State}. + +do_resends(From, To, State) when From =< To -> + % ?INFO("rabbit_fifo_client: doing resends From ~w To ~w~n", [From, To]), + lists:foldl(fun resend/2, State, lists:seq(From, To)); +do_resends(_, _, State) -> + State. + +% resends a command with a new sequence number +resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) -> + case maps:take(OldSeq, Pending0) of + {{Corr, Cmd}, Pending} -> + %% resends aren't subject to flow control here + resend_command(Leader, Corr, Cmd, State#state{pending = Pending}); + error -> + State + end. + +resend_all_pending(#state{pending = Pend} = State) -> + Seqs = lists:sort(maps:keys(Pend)), + lists:foldl(fun resend/2, State, Seqs). + +maybe_auto_ack(true, Deliver, State0) -> + %% manual ack is enabled + {ok, State0, [Deliver]}; +maybe_auto_ack(false, {deliver, Tag, _Ack, Msgs} = Deliver, State0) -> + %% we have to auto ack these deliveries + MsgIds = [I || {_, _, I, _, _} <- Msgs], + {State, Actions} = settle(Tag, MsgIds, State0), + {ok, State, [Deliver] ++ Actions}. + + +handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs}, + #state{cfg = #cfg{cluster_name = QName}, + consumer_deliveries = CDels0} = State0) -> + QRef = qref(Leader), + {LastId, _} = lists:last(IdMsgs), + Consumer = #consumer{ack = Ack} = maps:get(Tag, CDels0), + %% format as a deliver action + Del = {deliver, Tag, Ack, transform_msgs(QName, QRef, IdMsgs)}, + %% TODO: remove potential default allocation + case Consumer of + #consumer{last_msg_id = Prev} = C + when FstId =:= Prev+1 -> + maybe_auto_ack(Ack, Del, + State0#state{consumer_deliveries = + update_consumer(Tag, LastId, + length(IdMsgs), C, + CDels0)}); + #consumer{last_msg_id = Prev} = C + when FstId > Prev+1 -> + NumMissing = FstId - Prev + 1, + %% there may actually be fewer missing messages returned than expected + %% This can happen when a node the channel is on gets disconnected + %% from the node the leader is on and then reconnected afterwards. + %% When the node is disconnected the leader will return all checked + %% out messages to the main queue to ensure they don't get stuck in + %% case the node never comes back. + case get_missing_deliveries(Leader, Prev+1, FstId-1, Tag) of + {protocol_error, _, _, _} = Err -> + Err; + Missing -> + XDel = {deliver, Tag, Ack, transform_msgs(QName, QRef, + Missing ++ IdMsgs)}, + maybe_auto_ack(Ack, XDel, + State0#state{consumer_deliveries = + update_consumer(Tag, LastId, + length(IdMsgs) + NumMissing, + C, CDels0)}) + end; + #consumer{last_msg_id = Prev} + when FstId =< Prev -> + case lists:dropwhile(fun({Id, _}) -> Id =< Prev end, IdMsgs) of + [] -> + {ok, State0, []}; + IdMsgs2 -> + handle_delivery(Leader, {delivery, Tag, IdMsgs2}, State0) + end; + C when FstId =:= 0 -> + % the very first delivery + maybe_auto_ack(Ack, Del, + State0#state{consumer_deliveries = + update_consumer(Tag, LastId, + length(IdMsgs), + C#consumer{last_msg_id = LastId}, + CDels0)}) + end. + +transform_msgs(QName, QRef, Msgs) -> + lists:map( + fun({MsgId, {MsgHeader, Msg0}}) -> + {Msg, Redelivered} = case MsgHeader of + #{delivery_count := C} -> + {add_delivery_count_header(Msg0, C), true}; + _ -> + {Msg0, false} + end, + {QName, QRef, MsgId, Redelivered, Msg} + end, Msgs). + +update_consumer(Tag, LastId, DelCntIncr, + #consumer{delivery_count = D} = C, Consumers) -> + maps:put(Tag, + C#consumer{last_msg_id = LastId, + delivery_count = D + DelCntIncr}, + Consumers). + + +get_missing_deliveries(Leader, From, To, ConsumerTag) -> + ConsumerId = consumer_id(ConsumerTag), + % ?INFO("get_missing_deliveries for ~w from ~b to ~b", + % [ConsumerId, From, To]), + Query = fun (State) -> + rabbit_fifo:get_checked_out(ConsumerId, From, To, State) + end, + case ra:local_query(Leader, Query) of + {ok, {_, Missing}, _} -> + Missing; + {error, Error} -> + {protocol_error, internal_error, "Cannot query missing deliveries from ~p: ~p", + [Leader, Error]}; + {timeout, _} -> + {protocol_error, internal_error, "Cannot query missing deliveries from ~p: timeout", + [Leader]} + end. + +pick_server(#state{leader = undefined, + cfg = #cfg{servers = [N | _]}}) -> + %% TODO: pick random rather that first? + N; +pick_server(#state{leader = Leader}) -> + Leader. + +% servers sorted by last known leader +sorted_servers(#state{leader = undefined, + cfg = #cfg{servers = Servers}}) -> + Servers; +sorted_servers(#state{leader = Leader, + cfg = #cfg{servers = Servers}}) -> + [Leader | lists:delete(Leader, Servers)]. + +next_seq(#state{next_seq = Seq} = State) -> + {Seq, State#state{next_seq = Seq + 1}}. + +next_enqueue_seq(#state{next_enqueue_seq = Seq} = State) -> + {Seq, State#state{next_enqueue_seq = Seq + 1}}. + +consumer_id(ConsumerTag) -> + {ConsumerTag, self()}. + +send_command(Server, Correlation, Command, Priority, + #state{pending = Pending, + cfg = #cfg{soft_limit = SftLmt}} = State0) -> + {Seq, State} = next_seq(State0), + ok = ra:pipeline_command(Server, Command, Seq, Priority), + Tag = case maps:size(Pending) >= SftLmt of + true -> slow; + false -> ok + end, + {Tag, State#state{pending = Pending#{Seq => {Correlation, Command}}, + slow = Tag == slow}}. + +resend_command(Node, Correlation, Command, + #state{pending = Pending} = State0) -> + {Seq, State} = next_seq(State0), + ok = ra:pipeline_command(Node, Command, Seq), + State#state{pending = Pending#{Seq => {Correlation, Command}}}. + +add_command(_, _, [], Acc) -> + Acc; +add_command(Cid, settle, MsgIds, Acc) -> + [rabbit_fifo:make_settle(Cid, MsgIds) | Acc]; +add_command(Cid, return, MsgIds, Acc) -> + [rabbit_fifo:make_return(Cid, MsgIds) | Acc]; +add_command(Cid, discard, MsgIds, Acc) -> + [rabbit_fifo:make_discard(Cid, MsgIds) | Acc]. + +set_timer(#state{leader = Leader0, + cfg = #cfg{servers = [Server | _], + cluster_name = QName}} = State) -> + Leader = case Leader0 of + undefined -> Server; + _ -> + Leader0 + end, + Ref = erlang:send_after(?TIMER_TIME, self(), + {'$gen_cast', + {queue_event, QName, {Leader, timeout}}}), + State#state{timer_state = Ref}. + +cancel_timer(#state{timer_state = undefined} = State) -> + State; +cancel_timer(#state{timer_state = Ref} = State) -> + erlang:cancel_timer(Ref, [{async, true}, {info, false}]), + State#state{timer_state = undefined}. + +find_leader([]) -> + undefined; +find_leader([Server | Servers]) -> + case ra:members(Server, 500) of + {ok, _, Leader} -> Leader; + _ -> + find_leader(Servers) + end. + +qref({Ref, _}) -> Ref; +qref(Ref) -> Ref. |