summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_fifo_client.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/src/rabbit_fifo_client.erl')
-rw-r--r--deps/rabbit/src/rabbit_fifo_client.erl888
1 files changed, 888 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl
new file mode 100644
index 0000000000..3990222b15
--- /dev/null
+++ b/deps/rabbit/src/rabbit_fifo_client.erl
@@ -0,0 +1,888 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @doc Provides an easy to consume API for interacting with the {@link rabbit_fifo.}
+%% state machine implementation running inside a `ra' raft system.
+%%
+%% Handles command tracking and other non-functional concerns.
+-module(rabbit_fifo_client).
+
+-export([
+ init/2,
+ init/3,
+ init/5,
+ checkout/5,
+ cancel_checkout/2,
+ enqueue/2,
+ enqueue/3,
+ dequeue/3,
+ settle/3,
+ return/3,
+ discard/3,
+ credit/4,
+ handle_ra_event/3,
+ untracked_enqueue/2,
+ purge/1,
+ cluster_name/1,
+ update_machine_state/2,
+ pending_size/1,
+ stat/1,
+ stat/2
+ ]).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+-define(SOFT_LIMIT, 32).
+-define(TIMER_TIME, 10000).
+
+-type seq() :: non_neg_integer().
+%% last_applied is initialised to -1
+-type maybe_seq() :: integer().
+-type action() :: {send_credit_reply, Available :: non_neg_integer()} |
+ {send_drained, CTagCredit ::
+ {rabbit_fifo:consumer_tag(), non_neg_integer()}}.
+-type actions() :: [action()].
+
+-type cluster_name() :: rabbit_types:r(queue).
+
+-record(consumer, {last_msg_id :: seq() | -1,
+ ack = false :: boolean(),
+ delivery_count = 0 :: non_neg_integer()}).
+
+-record(cfg, {cluster_name :: cluster_name(),
+ servers = [] :: [ra:server_id()],
+ soft_limit = ?SOFT_LIMIT :: non_neg_integer(),
+ block_handler = fun() -> ok end :: fun(() -> term()),
+ unblock_handler = fun() -> ok end :: fun(() -> ok),
+ timeout :: non_neg_integer(),
+ version = 0 :: non_neg_integer()}).
+
+-record(state, {cfg :: #cfg{},
+ leader :: undefined | ra:server_id(),
+ queue_status :: undefined | go | reject_publish,
+ next_seq = 0 :: seq(),
+ %% Last applied is initialise to -1 to note that no command has yet been
+ %% applied, but allowing to resend messages if the first ones on the sequence
+ %% are lost (messages are sent from last_applied + 1)
+ last_applied = -1 :: maybe_seq(),
+ next_enqueue_seq = 1 :: seq(),
+ %% indicates that we've exceeded the soft limit
+ slow = false :: boolean(),
+ unsent_commands = #{} :: #{rabbit_fifo:consumer_id() =>
+ {[seq()], [seq()], [seq()]}},
+ pending = #{} :: #{seq() =>
+ {term(), rabbit_fifo:command()}},
+ consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() =>
+ #consumer{}},
+ timer_state :: term()
+ }).
+
+-opaque state() :: #state{}.
+
+-export_type([
+ state/0,
+ actions/0
+ ]).
+
+
+%% @doc Create the initial state for a new rabbit_fifo sessions. A state is needed
+%% to interact with a rabbit_fifo queue using @module.
+%% @param ClusterName the id of the cluster to interact with
+%% @param Servers The known servers of the queue. If the current leader is known
+%% ensure the leader node is at the head of the list.
+-spec init(cluster_name(), [ra:server_id()]) -> state().
+init(ClusterName, Servers) ->
+ init(ClusterName, Servers, ?SOFT_LIMIT).
+
+%% @doc Create the initial state for a new rabbit_fifo sessions. A state is needed
+%% to interact with a rabbit_fifo queue using @module.
+%% @param ClusterName the id of the cluster to interact with
+%% @param Servers The known servers of the queue. If the current leader is known
+%% ensure the leader node is at the head of the list.
+%% @param MaxPending size defining the max number of pending commands.
+-spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state().
+init(ClusterName = #resource{}, Servers, SoftLimit) ->
+ Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
+ #state{cfg = #cfg{cluster_name = ClusterName,
+ servers = Servers,
+ soft_limit = SoftLimit,
+ timeout = Timeout * 1000}}.
+
+-spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok),
+ fun(() -> ok)) -> state().
+init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
+ %% net ticktime is in seconds
+ Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
+ #state{cfg = #cfg{cluster_name = ClusterName,
+ servers = Servers,
+ block_handler = BlockFun,
+ unblock_handler = UnblockFun,
+ soft_limit = SoftLimit,
+ timeout = Timeout * 1000}}.
+
+
+%% @doc Enqueues a message.
+%% @param Correlation an arbitrary erlang term used to correlate this
+%% command when it has been applied.
+%% @param Msg an arbitrary erlang term representing the message.
+%% @param State the current {@module} state.
+%% @returns
+%% `{ok | slow, State}' if the command was successfully sent. If the return
+%% tag is `slow' it means the limit is approaching and it is time to slow down
+%% the sending rate.
+%% {@module} assigns a sequence number to every raft command it issues. The
+%% SequenceNumber can be correlated to the applied sequence numbers returned
+%% by the {@link handle_ra_event/2. handle_ra_event/2} function.
+-spec enqueue(Correlation :: term(), Msg :: term(), State :: state()) ->
+ {ok | slow | reject_publish, state()}.
+enqueue(Correlation, Msg,
+ #state{queue_status = undefined,
+ next_enqueue_seq = 1,
+ cfg = #cfg{timeout = Timeout}} = State0) ->
+ %% it is the first enqueue, check the version
+ {_, Node} = Server = pick_server(State0),
+ case rpc:call(Node, ra_machine, version, [{machine, rabbit_fifo, #{}}]) of
+ 0 ->
+ %% the leader is running the old version
+ %% so we can't initialize the enqueuer session safely
+ %% fall back on old behavour
+ enqueue(Correlation, Msg, State0#state{queue_status = go});
+ 1 ->
+ %% were running the new version on the leader do sync initialisation
+ %% of enqueuer session
+ Reg = rabbit_fifo:make_register_enqueuer(self()),
+ case ra:process_command(Server, Reg, Timeout) of
+ {ok, reject_publish, _} ->
+ {reject_publish, State0#state{queue_status = reject_publish}};
+ {ok, ok, _} ->
+ enqueue(Correlation, Msg, State0#state{queue_status = go});
+ {timeout, _} ->
+ %% if we timeout it is probably better to reject
+ %% the message than being uncertain
+ {reject_publish, State0};
+ Err ->
+ exit(Err)
+ end;
+ {badrpc, nodedown} ->
+ {reject_publish, State0}
+ end;
+enqueue(_Correlation, _Msg,
+ #state{queue_status = reject_publish,
+ cfg = #cfg{}} = State) ->
+ {reject_publish, State};
+enqueue(Correlation, Msg,
+ #state{slow = Slow,
+ queue_status = go,
+ cfg = #cfg{block_handler = BlockFun}} = State0) ->
+ Node = pick_server(State0),
+ {Next, State1} = next_enqueue_seq(State0),
+ % by default there is no correlation id
+ Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg),
+ case send_command(Node, Correlation, Cmd, low, State1) of
+ {slow, State} when not Slow ->
+ BlockFun(),
+ {slow, set_timer(State)};
+ Any ->
+ Any
+ end.
+
+%% @doc Enqueues a message.
+%% @param Msg an arbitrary erlang term representing the message.
+%% @param State the current {@module} state.
+%% @returns
+%% `{ok | slow, State}' if the command was successfully sent. If the return
+%% tag is `slow' it means the limit is approaching and it is time to slow down
+%% the sending rate.
+%% {@module} assigns a sequence number to every raft command it issues. The
+%% SequenceNumber can be correlated to the applied sequence numbers returned
+%% by the {@link handle_ra_event/2. handle_ra_event/2} function.
+%%
+-spec enqueue(Msg :: term(), State :: state()) ->
+ {ok | slow | reject_publish, state()}.
+enqueue(Msg, State) ->
+ enqueue(undefined, Msg, State).
+
+%% @doc Dequeue a message from the queue.
+%%
+%% This is a synchronous call. I.e. the call will block until the command
+%% has been accepted by the ra process or it times out.
+%%
+%% @param ConsumerTag a unique tag to identify this particular consumer.
+%% @param Settlement either `settled' or `unsettled'. When `settled' no
+%% further settlement needs to be done.
+%% @param State The {@module} state.
+%%
+%% @returns `{ok, IdMsg, State}' or `{error | timeout, term()}'
+-spec dequeue(rabbit_fifo:consumer_tag(),
+ Settlement :: settled | unsettled, state()) ->
+ {ok, non_neg_integer(), term(), non_neg_integer()}
+ | {empty, state()} | {error | timeout, term()}.
+dequeue(ConsumerTag, Settlement,
+ #state{cfg = #cfg{timeout = Timeout,
+ cluster_name = QName}} = State0) ->
+ Node = pick_server(State0),
+ ConsumerId = consumer_id(ConsumerTag),
+ case ra:process_command(Node,
+ rabbit_fifo:make_checkout(ConsumerId,
+ {dequeue, Settlement},
+ #{}),
+ Timeout) of
+ {ok, {dequeue, empty}, Leader} ->
+ {empty, State0#state{leader = Leader}};
+ {ok, {dequeue, {MsgId, {MsgHeader, Msg0}}, MsgsReady}, Leader} ->
+ Count = case MsgHeader of
+ #{delivery_count := C} -> C;
+ _ -> 0
+ end,
+ IsDelivered = Count > 0,
+ Msg = add_delivery_count_header(Msg0, Count),
+ {ok, MsgsReady,
+ {QName, qref(Leader), MsgId, IsDelivered, Msg},
+ State0#state{leader = Leader}};
+ {ok, {error, _} = Err, _Leader} ->
+ Err;
+ Err ->
+ Err
+ end.
+
+add_delivery_count_header(#basic_message{} = Msg0, Count)
+ when is_integer(Count) ->
+ rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0);
+add_delivery_count_header(Msg, _Count) ->
+ Msg.
+
+
+%% @doc Settle a message. Permanently removes message from the queue.
+%% @param ConsumerTag the tag uniquely identifying the consumer.
+%% @param MsgIds the message ids received with the {@link rabbit_fifo:delivery/0.}
+%% @param State the {@module} state
+%% @returns
+%% `{ok | slow, State}' if the command was successfully sent. If the return
+%% tag is `slow' it means the limit is approaching and it is time to slow down
+%% the sending rate.
+%%
+-spec settle(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
+ {state(), list()}.
+settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
+ Node = pick_server(State0),
+ Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds),
+ case send_command(Node, undefined, Cmd, normal, State0) of
+ {_, S} ->
+ % turn slow into ok for this function
+ {S, []}
+ end;
+settle(ConsumerTag, [_|_] = MsgIds,
+ #state{unsent_commands = Unsent0} = State0) ->
+ ConsumerId = consumer_id(ConsumerTag),
+ %% we've reached the soft limit so will stash the command to be
+ %% sent once we have seen enough notifications
+ Unsent = maps:update_with(ConsumerId,
+ fun ({Settles, Returns, Discards}) ->
+ {Settles ++ MsgIds, Returns, Discards}
+ end, {MsgIds, [], []}, Unsent0),
+ {State0#state{unsent_commands = Unsent}, []}.
+
+%% @doc Return a message to the queue.
+%% @param ConsumerTag the tag uniquely identifying the consumer.
+%% @param MsgIds the message ids to return received
+%% from {@link rabbit_fifo:delivery/0.}
+%% @param State the {@module} state
+%% @returns
+%% `{ok | slow, State}' if the command was successfully sent. If the return
+%% tag is `slow' it means the limit is approaching and it is time to slow down
+%% the sending rate.
+%%
+-spec return(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
+ {state(), list()}.
+return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
+ Node = pick_server(State0),
+ % TODO: make rabbit_fifo return support lists of message ids
+ Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds),
+ case send_command(Node, undefined, Cmd, normal, State0) of
+ {_, S} ->
+ {S, []}
+ end;
+return(ConsumerTag, [_|_] = MsgIds,
+ #state{unsent_commands = Unsent0} = State0) ->
+ ConsumerId = consumer_id(ConsumerTag),
+ %% we've reached the soft limit so will stash the command to be
+ %% sent once we have seen enough notifications
+ Unsent = maps:update_with(ConsumerId,
+ fun ({Settles, Returns, Discards}) ->
+ {Settles, Returns ++ MsgIds, Discards}
+ end, {[], MsgIds, []}, Unsent0),
+ {State0#state{unsent_commands = Unsent}, []}.
+
+%% @doc Discards a checked out message.
+%% If the queue has a dead_letter_handler configured this will be called.
+%% @param ConsumerTag the tag uniquely identifying the consumer.
+%% @param MsgIds the message ids to discard
+%% from {@link rabbit_fifo:delivery/0.}
+%% @param State the {@module} state
+%% @returns
+%% `{ok | slow, State}' if the command was successfully sent. If the return
+%% tag is `slow' it means the limit is approaching and it is time to slow down
+%% the sending rate.
+-spec discard(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
+ {state(), list()}.
+discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
+ Node = pick_server(State0),
+ Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds),
+ case send_command(Node, undefined, Cmd, normal, State0) of
+ {_, S} ->
+ % turn slow into ok for this function
+ {S, []}
+ end;
+discard(ConsumerTag, [_|_] = MsgIds,
+ #state{unsent_commands = Unsent0} = State0) ->
+ ConsumerId = consumer_id(ConsumerTag),
+ %% we've reached the soft limit so will stash the command to be
+ %% sent once we have seen enough notifications
+ Unsent = maps:update_with(ConsumerId,
+ fun ({Settles, Returns, Discards}) ->
+ {Settles, Returns, Discards ++ MsgIds}
+ end, {[], [], MsgIds}, Unsent0),
+ {State0#state{unsent_commands = Unsent}, []}.
+
+%% @doc Register with the rabbit_fifo queue to "checkout" messages as they
+%% become available.
+%%
+%% This is a synchronous call. I.e. the call will block until the command
+%% has been accepted by the ra process or it times out.
+%%
+%% @param ConsumerTag a unique tag to identify this particular consumer.
+%% @param NumUnsettled the maximum number of in-flight messages. Once this
+%% number of messages has been received but not settled no further messages
+%% will be delivered to the consumer.
+%% @param CreditMode The credit mode to use for the checkout.
+%% simple_prefetch: credit is auto topped up as deliveries are settled
+%% credited: credit is only increased by sending credit to the queue
+%% @param State The {@module} state.
+%%
+%% @returns `{ok, State}' or `{error | timeout, term()}'
+-spec checkout(rabbit_fifo:consumer_tag(),
+ NumUnsettled :: non_neg_integer(),
+ CreditMode :: rabbit_fifo:credit_mode(),
+ Meta :: rabbit_fifo:consumer_meta(),
+ state()) -> {ok, state()} | {error | timeout, term()}.
+checkout(ConsumerTag, NumUnsettled, CreditMode, Meta,
+ #state{consumer_deliveries = CDels0} = State0) ->
+ Servers = sorted_servers(State0),
+ ConsumerId = {ConsumerTag, self()},
+ Cmd = rabbit_fifo:make_checkout(ConsumerId,
+ {auto, NumUnsettled, CreditMode},
+ Meta),
+ %% ???
+ Ack = maps:get(ack, Meta, true),
+
+ SDels = maps:update_with(ConsumerTag,
+ fun (V) ->
+ V#consumer{ack = Ack}
+ end,
+ #consumer{last_msg_id = -1,
+ ack = Ack}, CDels0),
+ try_process_command(Servers, Cmd, State0#state{consumer_deliveries = SDels}).
+
+%% @doc Provide credit to the queue
+%%
+%% This only has an effect if the consumer uses credit mode: credited
+%% @param ConsumerTag a unique tag to identify this particular consumer.
+%% @param Credit the amount of credit to provide to theq queue
+%% @param Drain tells the queue to use up any credit that cannot be immediately
+%% fulfilled. (i.e. there are not enough messages on queue to use up all the
+%% provided credit).
+-spec credit(rabbit_fifo:consumer_tag(),
+ Credit :: non_neg_integer(),
+ Drain :: boolean(),
+ state()) ->
+ {state(), actions()}.
+credit(ConsumerTag, Credit, Drain,
+ #state{consumer_deliveries = CDels} = State0) ->
+ ConsumerId = consumer_id(ConsumerTag),
+ %% the last received msgid provides us with the delivery count if we
+ %% add one as it is 0 indexed
+ C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}),
+ Node = pick_server(State0),
+ Cmd = rabbit_fifo:make_credit(ConsumerId, Credit,
+ C#consumer.last_msg_id + 1, Drain),
+ case send_command(Node, undefined, Cmd, normal, State0) of
+ {_, S} ->
+ % turn slow into ok for this function
+ {S, []}
+ end.
+
+%% @doc Cancels a checkout with the rabbit_fifo queue for the consumer tag
+%%
+%% This is a synchronous call. I.e. the call will block until the command
+%% has been accepted by the ra process or it times out.
+%%
+%% @param ConsumerTag a unique tag to identify this particular consumer.
+%% @param State The {@module} state.
+%%
+%% @returns `{ok, State}' or `{error | timeout, term()}'
+-spec cancel_checkout(rabbit_fifo:consumer_tag(), state()) ->
+ {ok, state()} | {error | timeout, term()}.
+cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) ->
+ Servers = sorted_servers(State0),
+ ConsumerId = {ConsumerTag, self()},
+ Cmd = rabbit_fifo:make_checkout(ConsumerId, cancel, #{}),
+ State = State0#state{consumer_deliveries = maps:remove(ConsumerTag, CDels)},
+ try_process_command(Servers, Cmd, State).
+
+%% @doc Purges all the messages from a rabbit_fifo queue and returns the number
+%% of messages purged.
+-spec purge(ra:server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}.
+purge(Node) ->
+ case ra:process_command(Node, rabbit_fifo:make_purge()) of
+ {ok, {purge, Reply}, _} ->
+ {ok, Reply};
+ Err ->
+ Err
+ end.
+
+-spec pending_size(state()) -> non_neg_integer().
+pending_size(#state{pending = Pend}) ->
+ maps:size(Pend).
+
+-spec stat(ra:server_id()) ->
+ {ok, non_neg_integer(), non_neg_integer()}
+ | {error | timeout, term()}.
+stat(Leader) ->
+ %% short timeout as we don't want to spend too long if it is going to
+ %% fail anyway
+ stat(Leader, 250).
+
+-spec stat(ra:server_id(), non_neg_integer()) ->
+ {ok, non_neg_integer(), non_neg_integer()}
+ | {error | timeout, term()}.
+stat(Leader, Timeout) ->
+ %% short timeout as we don't want to spend too long if it is going to
+ %% fail anyway
+ case ra:local_query(Leader, fun rabbit_fifo:query_stat/1, Timeout) of
+ {ok, {_, {R, C}}, _} -> {ok, R, C};
+ {error, _} = Error -> Error;
+ {timeout, _} = Error -> Error
+ end.
+
+%% @doc returns the cluster name
+-spec cluster_name(state()) -> cluster_name().
+cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) ->
+ ClusterName.
+
+update_machine_state(Server, Conf) ->
+ case ra:process_command(Server, rabbit_fifo:make_update_config(Conf)) of
+ {ok, ok, _} ->
+ ok;
+ Err ->
+ Err
+ end.
+
+%% @doc Handles incoming `ra_events'. Events carry both internal "bookeeping"
+%% events emitted by the `ra' leader as well as `rabbit_fifo' emitted events such
+%% as message deliveries. All ra events need to be handled by {@module}
+%% to ensure bookeeping, resends and flow control is correctly handled.
+%%
+%% If the `ra_event' contains a `rabbit_fifo' generated message it will be returned
+%% for further processing.
+%%
+%% Example:
+%%
+%% ```
+%% receive
+%% {ra_event, From, Evt} ->
+%% case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
+%% {internal, _Seq, State} -> State;
+%% {{delivery, _ConsumerTag, Msgs}, State} ->
+%% handle_messages(Msgs),
+%% ...
+%% end
+%% end
+%% '''
+%%
+%% @param From the {@link ra:server_id().} of the sending process.
+%% @param Event the body of the `ra_event'.
+%% @param State the current {@module} state.
+%%
+%% @returns
+%% `{internal, AppliedCorrelations, State}' if the event contained an internally
+%% handled event such as a notification and a correlation was included with
+%% the command (e.g. in a call to `enqueue/3' the correlation terms are returned
+%% here.
+%%
+%% `{RaFifoEvent, State}' if the event contained a client message generated by
+%% the `rabbit_fifo' state machine such as a delivery.
+%%
+%% The type of `rabbit_fifo' client messages that can be received are:
+%%
+%% `{delivery, ConsumerTag, [{MsgId, {MsgHeader, Msg}}]}'
+%%
+%% <li>`ConsumerTag' the binary tag passed to {@link checkout/3.}</li>
+%% <li>`MsgId' is a consumer scoped monotonically incrementing id that can be
+%% used to {@link settle/3.} (roughly: AMQP 0.9.1 ack) message once finished
+%% with them.</li>
+-spec handle_ra_event(ra:server_id(), ra_server_proc:ra_event_body(), state()) ->
+ {internal, Correlators :: [term()], actions(), state()} |
+ {rabbit_fifo:client_msg(), state()} | eol.
+handle_ra_event(From, {applied, Seqs},
+ #state{cfg = #cfg{cluster_name = QRef,
+ soft_limit = SftLmt,
+ unblock_handler = UnblockFun}} = State0) ->
+
+ {Corrs, Actions0, State1} = lists:foldl(fun seq_applied/2,
+ {[], [], State0#state{leader = From}},
+ Seqs),
+ Actions = case Corrs of
+ [] ->
+ lists:reverse(Actions0);
+ _ ->
+ [{settled, QRef, Corrs}
+ | lists:reverse(Actions0)]
+ end,
+ case maps:size(State1#state.pending) < SftLmt of
+ true when State1#state.slow == true ->
+ % we have exited soft limit state
+ % send any unsent commands and cancel the time as
+ % TODO: really the timer should only be cancelled when the channel
+ % exits flow state (which depends on the state of all queues the
+ % channel is interacting with)
+ % but the fact the queue has just applied suggests
+ % it's ok to cancel here anyway
+ State2 = cancel_timer(State1#state{slow = false,
+ unsent_commands = #{}}),
+ % build up a list of commands to issue
+ Commands = maps:fold(
+ fun (Cid, {Settled, Returns, Discards}, Acc) ->
+ add_command(Cid, settle, Settled,
+ add_command(Cid, return, Returns,
+ add_command(Cid, discard,
+ Discards, Acc)))
+ end, [], State1#state.unsent_commands),
+ Node = pick_server(State2),
+ %% send all the settlements and returns
+ State = lists:foldl(fun (C, S0) ->
+ case send_command(Node, undefined,
+ C, normal, S0) of
+ {T, S} when T =/= error ->
+ S
+ end
+ end, State2, Commands),
+ UnblockFun(),
+ {ok, State, Actions};
+ _ ->
+ {ok, State1, Actions}
+ end;
+handle_ra_event(From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
+ handle_delivery(From, Del, State0);
+handle_ra_event(_, {machine, {queue_status, Status}},
+ #state{} = State) ->
+ %% just set the queue status
+ {ok, State#state{queue_status = Status}, []};
+handle_ra_event(Leader, {machine, leader_change},
+ #state{leader = Leader} = State) ->
+ %% leader already known
+ {ok, State, []};
+handle_ra_event(Leader, {machine, leader_change}, State0) ->
+ %% we need to update leader
+ %% and resend any pending commands
+ State = resend_all_pending(State0#state{leader = Leader}),
+ {ok, State, []};
+handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) ->
+ % TODO: how should these be handled? re-sent on timer or try random
+ {ok, State0, []};
+handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) ->
+ State1 = State0#state{leader = Leader},
+ State = resend(Seq, State1),
+ {ok, State, []};
+handle_ra_event(_, timeout, #state{cfg = #cfg{servers = Servers}} = State0) ->
+ case find_leader(Servers) of
+ undefined ->
+ %% still no leader, set the timer again
+ {ok, set_timer(State0), []};
+ Leader ->
+ State = resend_all_pending(State0#state{leader = Leader}),
+ {ok, State, []}
+ end;
+handle_ra_event(_Leader, {machine, eol}, _State0) ->
+ eol.
+
+%% @doc Attempts to enqueue a message using cast semantics. This provides no
+%% guarantees or retries if the message fails to achieve consensus or if the
+%% servers sent to happens not to be available. If the message is sent to a
+%% follower it will attempt the deliver it to the leader, if known. Else it will
+%% drop the messages.
+%%
+%% NB: only use this for non-critical enqueues where a full rabbit_fifo_client state
+%% cannot be maintained.
+%%
+%% @param CusterId the cluster id.
+%% @param Servers the known servers in the cluster.
+%% @param Msg the message to enqueue.
+%%
+%% @returns `ok'
+-spec untracked_enqueue([ra:server_id()], term()) ->
+ ok.
+untracked_enqueue([Node | _], Msg) ->
+ Cmd = rabbit_fifo:make_enqueue(undefined, undefined, Msg),
+ ok = ra:pipeline_command(Node, Cmd),
+ ok.
+
+%% Internal
+
+try_process_command([Server | Rem], Cmd, State) ->
+ case ra:process_command(Server, Cmd, 30000) of
+ {ok, _, Leader} ->
+ {ok, State#state{leader = Leader}};
+ Err when length(Rem) =:= 0 ->
+ Err;
+ _ ->
+ try_process_command(Rem, Cmd, State)
+ end.
+
+seq_applied({Seq, MaybeAction},
+ {Corrs, Actions0, #state{last_applied = Last} = State0})
+ when Seq > Last ->
+ State1 = do_resends(Last+1, Seq-1, State0),
+ {Actions, State} = maybe_add_action(MaybeAction, Actions0, State1),
+ case maps:take(Seq, State#state.pending) of
+ {{undefined, _}, Pending} ->
+ {Corrs, Actions, State#state{pending = Pending,
+ last_applied = Seq}};
+ {{Corr, _}, Pending} ->
+ {[Corr | Corrs], Actions, State#state{pending = Pending,
+ last_applied = Seq}};
+ error ->
+ % must have already been resent or removed for some other reason
+ % still need to update last_applied or we may inadvertently resend
+ % stuff later
+ {Corrs, Actions, State#state{last_applied = Seq}}
+ end;
+seq_applied(_Seq, Acc) ->
+ Acc.
+
+maybe_add_action(ok, Acc, State) ->
+ {Acc, State};
+maybe_add_action({multi, Actions}, Acc0, State0) ->
+ lists:foldl(fun (Act, {Acc, State}) ->
+ maybe_add_action(Act, Acc, State)
+ end, {Acc0, State0}, Actions);
+maybe_add_action({send_drained, {Tag, Credit}} = Action, Acc,
+ #state{consumer_deliveries = CDels} = State) ->
+ %% add credit to consumer delivery_count
+ C = maps:get(Tag, CDels),
+ {[Action | Acc],
+ State#state{consumer_deliveries =
+ update_consumer(Tag, C#consumer.last_msg_id,
+ Credit, C, CDels)}};
+maybe_add_action(Action, Acc, State) ->
+ %% anything else is assumed to be an action
+ {[Action | Acc], State}.
+
+do_resends(From, To, State) when From =< To ->
+ % ?INFO("rabbit_fifo_client: doing resends From ~w To ~w~n", [From, To]),
+ lists:foldl(fun resend/2, State, lists:seq(From, To));
+do_resends(_, _, State) ->
+ State.
+
+% resends a command with a new sequence number
+resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) ->
+ case maps:take(OldSeq, Pending0) of
+ {{Corr, Cmd}, Pending} ->
+ %% resends aren't subject to flow control here
+ resend_command(Leader, Corr, Cmd, State#state{pending = Pending});
+ error ->
+ State
+ end.
+
+resend_all_pending(#state{pending = Pend} = State) ->
+ Seqs = lists:sort(maps:keys(Pend)),
+ lists:foldl(fun resend/2, State, Seqs).
+
+maybe_auto_ack(true, Deliver, State0) ->
+ %% manual ack is enabled
+ {ok, State0, [Deliver]};
+maybe_auto_ack(false, {deliver, Tag, _Ack, Msgs} = Deliver, State0) ->
+ %% we have to auto ack these deliveries
+ MsgIds = [I || {_, _, I, _, _} <- Msgs],
+ {State, Actions} = settle(Tag, MsgIds, State0),
+ {ok, State, [Deliver] ++ Actions}.
+
+
+handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs},
+ #state{cfg = #cfg{cluster_name = QName},
+ consumer_deliveries = CDels0} = State0) ->
+ QRef = qref(Leader),
+ {LastId, _} = lists:last(IdMsgs),
+ Consumer = #consumer{ack = Ack} = maps:get(Tag, CDels0),
+ %% format as a deliver action
+ Del = {deliver, Tag, Ack, transform_msgs(QName, QRef, IdMsgs)},
+ %% TODO: remove potential default allocation
+ case Consumer of
+ #consumer{last_msg_id = Prev} = C
+ when FstId =:= Prev+1 ->
+ maybe_auto_ack(Ack, Del,
+ State0#state{consumer_deliveries =
+ update_consumer(Tag, LastId,
+ length(IdMsgs), C,
+ CDels0)});
+ #consumer{last_msg_id = Prev} = C
+ when FstId > Prev+1 ->
+ NumMissing = FstId - Prev + 1,
+ %% there may actually be fewer missing messages returned than expected
+ %% This can happen when a node the channel is on gets disconnected
+ %% from the node the leader is on and then reconnected afterwards.
+ %% When the node is disconnected the leader will return all checked
+ %% out messages to the main queue to ensure they don't get stuck in
+ %% case the node never comes back.
+ case get_missing_deliveries(Leader, Prev+1, FstId-1, Tag) of
+ {protocol_error, _, _, _} = Err ->
+ Err;
+ Missing ->
+ XDel = {deliver, Tag, Ack, transform_msgs(QName, QRef,
+ Missing ++ IdMsgs)},
+ maybe_auto_ack(Ack, XDel,
+ State0#state{consumer_deliveries =
+ update_consumer(Tag, LastId,
+ length(IdMsgs) + NumMissing,
+ C, CDels0)})
+ end;
+ #consumer{last_msg_id = Prev}
+ when FstId =< Prev ->
+ case lists:dropwhile(fun({Id, _}) -> Id =< Prev end, IdMsgs) of
+ [] ->
+ {ok, State0, []};
+ IdMsgs2 ->
+ handle_delivery(Leader, {delivery, Tag, IdMsgs2}, State0)
+ end;
+ C when FstId =:= 0 ->
+ % the very first delivery
+ maybe_auto_ack(Ack, Del,
+ State0#state{consumer_deliveries =
+ update_consumer(Tag, LastId,
+ length(IdMsgs),
+ C#consumer{last_msg_id = LastId},
+ CDels0)})
+ end.
+
+transform_msgs(QName, QRef, Msgs) ->
+ lists:map(
+ fun({MsgId, {MsgHeader, Msg0}}) ->
+ {Msg, Redelivered} = case MsgHeader of
+ #{delivery_count := C} ->
+ {add_delivery_count_header(Msg0, C), true};
+ _ ->
+ {Msg0, false}
+ end,
+ {QName, QRef, MsgId, Redelivered, Msg}
+ end, Msgs).
+
+update_consumer(Tag, LastId, DelCntIncr,
+ #consumer{delivery_count = D} = C, Consumers) ->
+ maps:put(Tag,
+ C#consumer{last_msg_id = LastId,
+ delivery_count = D + DelCntIncr},
+ Consumers).
+
+
+get_missing_deliveries(Leader, From, To, ConsumerTag) ->
+ ConsumerId = consumer_id(ConsumerTag),
+ % ?INFO("get_missing_deliveries for ~w from ~b to ~b",
+ % [ConsumerId, From, To]),
+ Query = fun (State) ->
+ rabbit_fifo:get_checked_out(ConsumerId, From, To, State)
+ end,
+ case ra:local_query(Leader, Query) of
+ {ok, {_, Missing}, _} ->
+ Missing;
+ {error, Error} ->
+ {protocol_error, internal_error, "Cannot query missing deliveries from ~p: ~p",
+ [Leader, Error]};
+ {timeout, _} ->
+ {protocol_error, internal_error, "Cannot query missing deliveries from ~p: timeout",
+ [Leader]}
+ end.
+
+pick_server(#state{leader = undefined,
+ cfg = #cfg{servers = [N | _]}}) ->
+ %% TODO: pick random rather that first?
+ N;
+pick_server(#state{leader = Leader}) ->
+ Leader.
+
+% servers sorted by last known leader
+sorted_servers(#state{leader = undefined,
+ cfg = #cfg{servers = Servers}}) ->
+ Servers;
+sorted_servers(#state{leader = Leader,
+ cfg = #cfg{servers = Servers}}) ->
+ [Leader | lists:delete(Leader, Servers)].
+
+next_seq(#state{next_seq = Seq} = State) ->
+ {Seq, State#state{next_seq = Seq + 1}}.
+
+next_enqueue_seq(#state{next_enqueue_seq = Seq} = State) ->
+ {Seq, State#state{next_enqueue_seq = Seq + 1}}.
+
+consumer_id(ConsumerTag) ->
+ {ConsumerTag, self()}.
+
+send_command(Server, Correlation, Command, Priority,
+ #state{pending = Pending,
+ cfg = #cfg{soft_limit = SftLmt}} = State0) ->
+ {Seq, State} = next_seq(State0),
+ ok = ra:pipeline_command(Server, Command, Seq, Priority),
+ Tag = case maps:size(Pending) >= SftLmt of
+ true -> slow;
+ false -> ok
+ end,
+ {Tag, State#state{pending = Pending#{Seq => {Correlation, Command}},
+ slow = Tag == slow}}.
+
+resend_command(Node, Correlation, Command,
+ #state{pending = Pending} = State0) ->
+ {Seq, State} = next_seq(State0),
+ ok = ra:pipeline_command(Node, Command, Seq),
+ State#state{pending = Pending#{Seq => {Correlation, Command}}}.
+
+add_command(_, _, [], Acc) ->
+ Acc;
+add_command(Cid, settle, MsgIds, Acc) ->
+ [rabbit_fifo:make_settle(Cid, MsgIds) | Acc];
+add_command(Cid, return, MsgIds, Acc) ->
+ [rabbit_fifo:make_return(Cid, MsgIds) | Acc];
+add_command(Cid, discard, MsgIds, Acc) ->
+ [rabbit_fifo:make_discard(Cid, MsgIds) | Acc].
+
+set_timer(#state{leader = Leader0,
+ cfg = #cfg{servers = [Server | _],
+ cluster_name = QName}} = State) ->
+ Leader = case Leader0 of
+ undefined -> Server;
+ _ ->
+ Leader0
+ end,
+ Ref = erlang:send_after(?TIMER_TIME, self(),
+ {'$gen_cast',
+ {queue_event, QName, {Leader, timeout}}}),
+ State#state{timer_state = Ref}.
+
+cancel_timer(#state{timer_state = undefined} = State) ->
+ State;
+cancel_timer(#state{timer_state = Ref} = State) ->
+ erlang:cancel_timer(Ref, [{async, true}, {info, false}]),
+ State#state{timer_state = undefined}.
+
+find_leader([]) ->
+ undefined;
+find_leader([Server | Servers]) ->
+ case ra:members(Server, 500) of
+ {ok, _, Leader} -> Leader;
+ _ ->
+ find_leader(Servers)
+ end.
+
+qref({Ref, _}) -> Ref;
+qref(Ref) -> Ref.