summaryrefslogtreecommitdiff
path: root/src/rabbit_fifo_client.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_fifo_client.erl')
-rw-r--r--src/rabbit_fifo_client.erl920
1 files changed, 0 insertions, 920 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
deleted file mode 100644
index 6673cadc93..0000000000
--- a/src/rabbit_fifo_client.erl
+++ /dev/null
@@ -1,920 +0,0 @@
-%% This Source Code Form is subject to the terms of the Mozilla Public
-%% License, v. 2.0. If a copy of the MPL was not distributed with this
-%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
-%%
-%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
-%%
-
-%% @doc Provides an easy to consume API for interacting with the {@link rabbit_fifo.}
-%% state machine implementation running inside a `ra' raft system.
-%%
-%% Handles command tracking and other non-functional concerns.
--module(rabbit_fifo_client).
-
--export([
- init/2,
- init/3,
- init/5,
- checkout/4,
- checkout/5,
- cancel_checkout/2,
- enqueue/2,
- enqueue/3,
- dequeue/3,
- settle/3,
- return/3,
- discard/3,
- credit/4,
- handle_ra_event/3,
- untracked_enqueue/2,
- purge/1,
- cluster_name/1,
- update_machine_state/2,
- pending_size/1,
- stat/1,
- stat/2
- ]).
-
--include_lib("rabbit_common/include/rabbit.hrl").
-
--define(SOFT_LIMIT, 32).
--define(TIMER_TIME, 10000).
-
--type seq() :: non_neg_integer().
-%% last_applied is initialised to -1
--type maybe_seq() :: integer().
--type action() :: {send_credit_reply, Available :: non_neg_integer()} |
- {send_drained, CTagCredit ::
- {rabbit_fifo:consumer_tag(), non_neg_integer()}}.
--type actions() :: [action()].
-
--type cluster_name() :: rabbit_types:r(queue).
-
--record(consumer, {last_msg_id :: seq() | -1,
- ack = false :: boolean(),
- delivery_count = 0 :: non_neg_integer()}).
-
--record(cfg, {cluster_name :: cluster_name(),
- servers = [] :: [ra:server_id()],
- soft_limit = ?SOFT_LIMIT :: non_neg_integer(),
- block_handler = fun() -> ok end :: fun(() -> term()),
- unblock_handler = fun() -> ok end :: fun(() -> ok),
- timeout :: non_neg_integer(),
- version = 0 :: non_neg_integer()}).
-
--record(state, {cfg :: #cfg{},
- leader :: undefined | ra:server_id(),
- queue_status :: undefined | go | reject_publish,
- next_seq = 0 :: seq(),
- %% Last applied is initialise to -1 to note that no command has yet been
- %% applied, but allowing to resend messages if the first ones on the sequence
- %% are lost (messages are sent from last_applied + 1)
- last_applied = -1 :: maybe_seq(),
- next_enqueue_seq = 1 :: seq(),
- %% indicates that we've exceeded the soft limit
- slow = false :: boolean(),
- unsent_commands = #{} :: #{rabbit_fifo:consumer_id() =>
- {[seq()], [seq()], [seq()]}},
- pending = #{} :: #{seq() =>
- {term(), rabbit_fifo:command()}},
- consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() =>
- #consumer{}},
- timer_state :: term()
- }).
-
--opaque state() :: #state{}.
-
--export_type([
- state/0,
- actions/0
- ]).
-
-
-%% @doc Create the initial state for a new rabbit_fifo sessions. A state is needed
-%% to interact with a rabbit_fifo queue using @module.
-%% @param ClusterName the id of the cluster to interact with
-%% @param Servers The known servers of the queue. If the current leader is known
-%% ensure the leader node is at the head of the list.
--spec init(cluster_name(), [ra:server_id()]) -> state().
-init(ClusterName, Servers) ->
- init(ClusterName, Servers, ?SOFT_LIMIT).
-
-%% @doc Create the initial state for a new rabbit_fifo sessions. A state is needed
-%% to interact with a rabbit_fifo queue using @module.
-%% @param ClusterName the id of the cluster to interact with
-%% @param Servers The known servers of the queue. If the current leader is known
-%% ensure the leader node is at the head of the list.
-%% @param MaxPending size defining the max number of pending commands.
--spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state().
-init(ClusterName = #resource{}, Servers, SoftLimit) ->
- Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
- #state{cfg = #cfg{cluster_name = ClusterName,
- servers = Servers,
- soft_limit = SoftLimit,
- timeout = Timeout * 1000}}.
-
--spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok),
- fun(() -> ok)) -> state().
-init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
- %% net ticktime is in seconds
- Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
- #state{cfg = #cfg{cluster_name = ClusterName,
- servers = Servers,
- block_handler = BlockFun,
- unblock_handler = UnblockFun,
- soft_limit = SoftLimit,
- timeout = Timeout * 1000}}.
-
-
-%% @doc Enqueues a message.
-%% @param Correlation an arbitrary erlang term used to correlate this
-%% command when it has been applied.
-%% @param Msg an arbitrary erlang term representing the message.
-%% @param State the current {@module} state.
-%% @returns
-%% `{ok | slow, State}' if the command was successfully sent. If the return
-%% tag is `slow' it means the limit is approaching and it is time to slow down
-%% the sending rate.
-%% {@module} assigns a sequence number to every raft command it issues. The
-%% SequenceNumber can be correlated to the applied sequence numbers returned
-%% by the {@link handle_ra_event/2. handle_ra_event/2} function.
--spec enqueue(Correlation :: term(), Msg :: term(), State :: state()) ->
- {ok | slow | reject_publish, state()}.
-enqueue(Correlation, Msg,
- #state{queue_status = undefined,
- next_enqueue_seq = 1,
- cfg = #cfg{timeout = Timeout}} = State0) ->
- %% it is the first enqueue, check the version
- {_, Node} = Server = pick_server(State0),
- case rpc:call(Node, ra_machine, version, [{machine, rabbit_fifo, #{}}]) of
- 0 ->
- %% the leader is running the old version
- %% so we can't initialize the enqueuer session safely
- %% fall back on old behavour
- enqueue(Correlation, Msg, State0#state{queue_status = go});
- 1 ->
- %% were running the new version on the leader do sync initialisation
- %% of enqueuer session
- Reg = rabbit_fifo:make_register_enqueuer(self()),
- case ra:process_command(Server, Reg, Timeout) of
- {ok, reject_publish, _} ->
- {reject_publish, State0#state{queue_status = reject_publish}};
- {ok, ok, _} ->
- enqueue(Correlation, Msg, State0#state{queue_status = go});
- {timeout, _} ->
- %% if we timeout it is probably better to reject
- %% the message than being uncertain
- {reject_publish, State0};
- Err ->
- exit(Err)
- end;
- {badrpc, nodedown} ->
- {reject_publish, State0}
- end;
-enqueue(_Correlation, _Msg,
- #state{queue_status = reject_publish,
- cfg = #cfg{}} = State) ->
- {reject_publish, State};
-enqueue(Correlation, Msg,
- #state{slow = Slow,
- queue_status = go,
- cfg = #cfg{block_handler = BlockFun}} = State0) ->
- Node = pick_server(State0),
- {Next, State1} = next_enqueue_seq(State0),
- % by default there is no correlation id
- Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg),
- case send_command(Node, Correlation, Cmd, low, State1) of
- {slow, State} when not Slow ->
- BlockFun(),
- {slow, set_timer(State)};
- Any ->
- Any
- end.
-
-%% @doc Enqueues a message.
-%% @param Msg an arbitrary erlang term representing the message.
-%% @param State the current {@module} state.
-%% @returns
-%% `{ok | slow, State}' if the command was successfully sent. If the return
-%% tag is `slow' it means the limit is approaching and it is time to slow down
-%% the sending rate.
-%% {@module} assigns a sequence number to every raft command it issues. The
-%% SequenceNumber can be correlated to the applied sequence numbers returned
-%% by the {@link handle_ra_event/2. handle_ra_event/2} function.
-%%
--spec enqueue(Msg :: term(), State :: state()) ->
- {ok | slow | reject_publish, state()}.
-enqueue(Msg, State) ->
- enqueue(undefined, Msg, State).
-
-%% @doc Dequeue a message from the queue.
-%%
-%% This is a synchronous call. I.e. the call will block until the command
-%% has been accepted by the ra process or it times out.
-%%
-%% @param ConsumerTag a unique tag to identify this particular consumer.
-%% @param Settlement either `settled' or `unsettled'. When `settled' no
-%% further settlement needs to be done.
-%% @param State The {@module} state.
-%%
-%% @returns `{ok, IdMsg, State}' or `{error | timeout, term()}'
--spec dequeue(rabbit_fifo:consumer_tag(),
- Settlement :: settled | unsettled, state()) ->
- {ok, non_neg_integer(), term(), non_neg_integer()}
- | {empty, state()} | {error | timeout, term()}.
-dequeue(ConsumerTag, Settlement,
- #state{cfg = #cfg{timeout = Timeout,
- cluster_name = QName}} = State0) ->
- Node = pick_server(State0),
- ConsumerId = consumer_id(ConsumerTag),
- case ra:process_command(Node,
- rabbit_fifo:make_checkout(ConsumerId,
- {dequeue, Settlement},
- #{}),
- Timeout) of
- {ok, {dequeue, empty}, Leader} ->
- {empty, State0#state{leader = Leader}};
- {ok, {dequeue, {MsgId, {MsgHeader, Msg0}}, MsgsReady}, Leader} ->
- Count = case MsgHeader of
- #{delivery_count := C} -> C;
- _ -> 0
- end,
- IsDelivered = Count > 0,
- Msg = add_delivery_count_header(Msg0, Count),
- {ok, MsgsReady,
- {QName, qref(Leader), MsgId, IsDelivered, Msg},
- State0#state{leader = Leader}};
- {ok, {error, _} = Err, _Leader} ->
- Err;
- Err ->
- Err
- end.
-
-add_delivery_count_header(#basic_message{} = Msg0, Count)
- when is_integer(Count) ->
- rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0);
-add_delivery_count_header(Msg, _Count) ->
- Msg.
-
-
-%% @doc Settle a message. Permanently removes message from the queue.
-%% @param ConsumerTag the tag uniquely identifying the consumer.
-%% @param MsgIds the message ids received with the {@link rabbit_fifo:delivery/0.}
-%% @param State the {@module} state
-%% @returns
-%% `{ok | slow, State}' if the command was successfully sent. If the return
-%% tag is `slow' it means the limit is approaching and it is time to slow down
-%% the sending rate.
-%%
--spec settle(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
- {state(), list()}.
-settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
- Node = pick_server(State0),
- Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds),
- case send_command(Node, undefined, Cmd, normal, State0) of
- {_, S} ->
- % turn slow into ok for this function
- {S, []}
- end;
-settle(ConsumerTag, [_|_] = MsgIds,
- #state{unsent_commands = Unsent0} = State0) ->
- ConsumerId = consumer_id(ConsumerTag),
- %% we've reached the soft limit so will stash the command to be
- %% sent once we have seen enough notifications
- Unsent = maps:update_with(ConsumerId,
- fun ({Settles, Returns, Discards}) ->
- {Settles ++ MsgIds, Returns, Discards}
- end, {MsgIds, [], []}, Unsent0),
- {State0#state{unsent_commands = Unsent}, []}.
-
-%% @doc Return a message to the queue.
-%% @param ConsumerTag the tag uniquely identifying the consumer.
-%% @param MsgIds the message ids to return received
-%% from {@link rabbit_fifo:delivery/0.}
-%% @param State the {@module} state
-%% @returns
-%% `{ok | slow, State}' if the command was successfully sent. If the return
-%% tag is `slow' it means the limit is approaching and it is time to slow down
-%% the sending rate.
-%%
--spec return(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
- {state(), list()}.
-return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
- Node = pick_server(State0),
- % TODO: make rabbit_fifo return support lists of message ids
- Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds),
- case send_command(Node, undefined, Cmd, normal, State0) of
- {_, S} ->
- {S, []}
- end;
-return(ConsumerTag, [_|_] = MsgIds,
- #state{unsent_commands = Unsent0} = State0) ->
- ConsumerId = consumer_id(ConsumerTag),
- %% we've reached the soft limit so will stash the command to be
- %% sent once we have seen enough notifications
- Unsent = maps:update_with(ConsumerId,
- fun ({Settles, Returns, Discards}) ->
- {Settles, Returns ++ MsgIds, Discards}
- end, {[], MsgIds, []}, Unsent0),
- {State0#state{unsent_commands = Unsent}, []}.
-
-%% @doc Discards a checked out message.
-%% If the queue has a dead_letter_handler configured this will be called.
-%% @param ConsumerTag the tag uniquely identifying the consumer.
-%% @param MsgIds the message ids to discard
-%% from {@link rabbit_fifo:delivery/0.}
-%% @param State the {@module} state
-%% @returns
-%% `{ok | slow, State}' if the command was successfully sent. If the return
-%% tag is `slow' it means the limit is approaching and it is time to slow down
-%% the sending rate.
--spec discard(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
- {state(), list()}.
-discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
- Node = pick_server(State0),
- Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds),
- case send_command(Node, undefined, Cmd, normal, State0) of
- {_, S} ->
- % turn slow into ok for this function
- {S, []}
- end;
-discard(ConsumerTag, [_|_] = MsgIds,
- #state{unsent_commands = Unsent0} = State0) ->
- ConsumerId = consumer_id(ConsumerTag),
- %% we've reached the soft limit so will stash the command to be
- %% sent once we have seen enough notifications
- Unsent = maps:update_with(ConsumerId,
- fun ({Settles, Returns, Discards}) ->
- {Settles, Returns, Discards ++ MsgIds}
- end, {[], [], MsgIds}, Unsent0),
- {State0#state{unsent_commands = Unsent}, []}.
-
-
-%% @doc Register with the rabbit_fifo queue to "checkout" messages as they
-%% become available.
-%%
-%% This is a synchronous call. I.e. the call will block until the command
-%% has been accepted by the ra process or it times out.
-%%
-%% @param ConsumerTag a unique tag to identify this particular consumer.
-%% @param NumUnsettled the maximum number of in-flight messages. Once this
-%% number of messages has been received but not settled no further messages
-%% will be delivered to the consumer.
-%% @param State The {@module} state.
-%%
-%% @returns `{ok, State}' or `{error | timeout, term()}'
--spec checkout(rabbit_fifo:consumer_tag(), NumUnsettled :: non_neg_integer(),
- rabbit_fifo:consumer_meta(),
- state()) -> {ok, state()} | {error | timeout, term()}.
-checkout(ConsumerTag, NumUnsettled, ConsumerInfo, State0)
- when is_map(ConsumerInfo) ->
- checkout(ConsumerTag, NumUnsettled, get_credit_mode(ConsumerInfo), ConsumerInfo, State0).
-
-%% @doc Register with the rabbit_fifo queue to "checkout" messages as they
-%% become available.
-%%
-%% This is a synchronous call. I.e. the call will block until the command
-%% has been accepted by the ra process or it times out.
-%%
-%% @param ConsumerTag a unique tag to identify this particular consumer.
-%% @param NumUnsettled the maximum number of in-flight messages. Once this
-%% number of messages has been received but not settled no further messages
-%% will be delivered to the consumer.
-%% @param CreditMode The credit mode to use for the checkout.
-%% simple_prefetch: credit is auto topped up as deliveries are settled
-%% credited: credit is only increased by sending credit to the queue
-%% @param State The {@module} state.
-%%
-%% @returns `{ok, State}' or `{error | timeout, term()}'
--spec checkout(rabbit_fifo:consumer_tag(),
- NumUnsettled :: non_neg_integer(),
- CreditMode :: rabbit_fifo:credit_mode(),
- Meta :: rabbit_fifo:consumer_meta(),
- state()) -> {ok, state()} | {error | timeout, term()}.
-checkout(ConsumerTag, NumUnsettled, CreditMode, Meta,
- #state{consumer_deliveries = CDels0} = State0) ->
- Servers = sorted_servers(State0),
- ConsumerId = {ConsumerTag, self()},
- Cmd = rabbit_fifo:make_checkout(ConsumerId,
- {auto, NumUnsettled, CreditMode},
- Meta),
- %% ???
- Ack = maps:get(ack, Meta, true),
-
- SDels = maps:update_with(ConsumerTag,
- fun (V) ->
- V#consumer{ack = Ack}
- end,
- #consumer{last_msg_id = -1,
- ack = Ack}, CDels0),
- try_process_command(Servers, Cmd, State0#state{consumer_deliveries = SDels}).
-
-%% @doc Provide credit to the queue
-%%
-%% This only has an effect if the consumer uses credit mode: credited
-%% @param ConsumerTag a unique tag to identify this particular consumer.
-%% @param Credit the amount of credit to provide to theq queue
-%% @param Drain tells the queue to use up any credit that cannot be immediately
-%% fulfilled. (i.e. there are not enough messages on queue to use up all the
-%% provided credit).
--spec credit(rabbit_fifo:consumer_tag(),
- Credit :: non_neg_integer(),
- Drain :: boolean(),
- state()) ->
- {state(), actions()}.
-credit(ConsumerTag, Credit, Drain,
- #state{consumer_deliveries = CDels} = State0) ->
- ConsumerId = consumer_id(ConsumerTag),
- %% the last received msgid provides us with the delivery count if we
- %% add one as it is 0 indexed
- C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}),
- Node = pick_server(State0),
- Cmd = rabbit_fifo:make_credit(ConsumerId, Credit,
- C#consumer.last_msg_id + 1, Drain),
- case send_command(Node, undefined, Cmd, normal, State0) of
- {_, S} ->
- % turn slow into ok for this function
- {S, []}
- end.
-
-%% @doc Cancels a checkout with the rabbit_fifo queue for the consumer tag
-%%
-%% This is a synchronous call. I.e. the call will block until the command
-%% has been accepted by the ra process or it times out.
-%%
-%% @param ConsumerTag a unique tag to identify this particular consumer.
-%% @param State The {@module} state.
-%%
-%% @returns `{ok, State}' or `{error | timeout, term()}'
--spec cancel_checkout(rabbit_fifo:consumer_tag(), state()) ->
- {ok, state()} | {error | timeout, term()}.
-cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) ->
- Servers = sorted_servers(State0),
- ConsumerId = {ConsumerTag, self()},
- Cmd = rabbit_fifo:make_checkout(ConsumerId, cancel, #{}),
- State = State0#state{consumer_deliveries = maps:remove(ConsumerTag, CDels)},
- try_process_command(Servers, Cmd, State).
-
-%% @doc Purges all the messages from a rabbit_fifo queue and returns the number
-%% of messages purged.
--spec purge(ra:server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}.
-purge(Node) ->
- case ra:process_command(Node, rabbit_fifo:make_purge()) of
- {ok, {purge, Reply}, _} ->
- {ok, Reply};
- Err ->
- Err
- end.
-
--spec pending_size(state()) -> non_neg_integer().
-pending_size(#state{pending = Pend}) ->
- maps:size(Pend).
-
--spec stat(ra:server_id()) ->
- {ok, non_neg_integer(), non_neg_integer()}
- | {error | timeout, term()}.
-stat(Leader) ->
- %% short timeout as we don't want to spend too long if it is going to
- %% fail anyway
- stat(Leader, 250).
-
--spec stat(ra:server_id(), non_neg_integer()) ->
- {ok, non_neg_integer(), non_neg_integer()}
- | {error | timeout, term()}.
-stat(Leader, Timeout) ->
- %% short timeout as we don't want to spend too long if it is going to
- %% fail anyway
- case ra:local_query(Leader, fun rabbit_fifo:query_stat/1, Timeout) of
- {ok, {_, {R, C}}, _} -> {ok, R, C};
- {error, _} = Error -> Error;
- {timeout, _} = Error -> Error
- end.
-
-%% @doc returns the cluster name
--spec cluster_name(state()) -> cluster_name().
-cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) ->
- ClusterName.
-
-update_machine_state(Server, Conf) ->
- case ra:process_command(Server, rabbit_fifo:make_update_config(Conf)) of
- {ok, ok, _} ->
- ok;
- Err ->
- Err
- end.
-
-%% @doc Handles incoming `ra_events'. Events carry both internal "bookeeping"
-%% events emitted by the `ra' leader as well as `rabbit_fifo' emitted events such
-%% as message deliveries. All ra events need to be handled by {@module}
-%% to ensure bookeeping, resends and flow control is correctly handled.
-%%
-%% If the `ra_event' contains a `rabbit_fifo' generated message it will be returned
-%% for further processing.
-%%
-%% Example:
-%%
-%% ```
-%% receive
-%% {ra_event, From, Evt} ->
-%% case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
-%% {internal, _Seq, State} -> State;
-%% {{delivery, _ConsumerTag, Msgs}, State} ->
-%% handle_messages(Msgs),
-%% ...
-%% end
-%% end
-%% '''
-%%
-%% @param From the {@link ra:server_id().} of the sending process.
-%% @param Event the body of the `ra_event'.
-%% @param State the current {@module} state.
-%%
-%% @returns
-%% `{internal, AppliedCorrelations, State}' if the event contained an internally
-%% handled event such as a notification and a correlation was included with
-%% the command (e.g. in a call to `enqueue/3' the correlation terms are returned
-%% here.
-%%
-%% `{RaFifoEvent, State}' if the event contained a client message generated by
-%% the `rabbit_fifo' state machine such as a delivery.
-%%
-%% The type of `rabbit_fifo' client messages that can be received are:
-%%
-%% `{delivery, ConsumerTag, [{MsgId, {MsgHeader, Msg}}]}'
-%%
-%% <li>`ConsumerTag' the binary tag passed to {@link checkout/3.}</li>
-%% <li>`MsgId' is a consumer scoped monotonically incrementing id that can be
-%% used to {@link settle/3.} (roughly: AMQP 0.9.1 ack) message once finished
-%% with them.</li>
--spec handle_ra_event(ra:server_id(), ra_server_proc:ra_event_body(), state()) ->
- {internal, Correlators :: [term()], actions(), state()} |
- {rabbit_fifo:client_msg(), state()} | eol.
-handle_ra_event(From, {applied, Seqs},
- #state{cfg = #cfg{cluster_name = QRef,
- soft_limit = SftLmt,
- unblock_handler = UnblockFun}} = State0) ->
-
- {Corrs, Actions0, State1} = lists:foldl(fun seq_applied/2,
- {[], [], State0#state{leader = From}},
- Seqs),
- Actions = case Corrs of
- [] ->
- lists:reverse(Actions0);
- _ ->
- [{settled, QRef, Corrs}
- | lists:reverse(Actions0)]
- end,
- case maps:size(State1#state.pending) < SftLmt of
- true when State1#state.slow == true ->
- % we have exited soft limit state
- % send any unsent commands and cancel the time as
- % TODO: really the timer should only be cancelled when the channel
- % exits flow state (which depends on the state of all queues the
- % channel is interacting with)
- % but the fact the queue has just applied suggests
- % it's ok to cancel here anyway
- State2 = cancel_timer(State1#state{slow = false,
- unsent_commands = #{}}),
- % build up a list of commands to issue
- Commands = maps:fold(
- fun (Cid, {Settled, Returns, Discards}, Acc) ->
- add_command(Cid, settle, Settled,
- add_command(Cid, return, Returns,
- add_command(Cid, discard,
- Discards, Acc)))
- end, [], State1#state.unsent_commands),
- Node = pick_server(State2),
- %% send all the settlements and returns
- State = lists:foldl(fun (C, S0) ->
- case send_command(Node, undefined,
- C, normal, S0) of
- {T, S} when T =/= error ->
- S
- end
- end, State2, Commands),
- UnblockFun(),
- {ok, State, Actions};
- _ ->
- {ok, State1, Actions}
- end;
-handle_ra_event(From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
- handle_delivery(From, Del, State0);
-handle_ra_event(_, {machine, {queue_status, Status}},
- #state{} = State) ->
- %% just set the queue status
- {ok, State#state{queue_status = Status}, []};
-handle_ra_event(Leader, {machine, leader_change},
- #state{leader = Leader} = State) ->
- %% leader already known
- {ok, State, []};
-handle_ra_event(Leader, {machine, leader_change}, State0) ->
- %% we need to update leader
- %% and resend any pending commands
- State = resend_all_pending(State0#state{leader = Leader}),
- {ok, State, []};
-handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) ->
- % TODO: how should these be handled? re-sent on timer or try random
- {ok, State0, []};
-handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) ->
- State1 = State0#state{leader = Leader},
- State = resend(Seq, State1),
- {ok, State, []};
-handle_ra_event(_, timeout, #state{cfg = #cfg{servers = Servers}} = State0) ->
- case find_leader(Servers) of
- undefined ->
- %% still no leader, set the timer again
- {ok, set_timer(State0), []};
- Leader ->
- State = resend_all_pending(State0#state{leader = Leader}),
- {ok, State, []}
- end;
-handle_ra_event(_Leader, {machine, eol}, _State0) ->
- eol.
-
-%% @doc Attempts to enqueue a message using cast semantics. This provides no
-%% guarantees or retries if the message fails to achieve consensus or if the
-%% servers sent to happens not to be available. If the message is sent to a
-%% follower it will attempt the deliver it to the leader, if known. Else it will
-%% drop the messages.
-%%
-%% NB: only use this for non-critical enqueues where a full rabbit_fifo_client state
-%% cannot be maintained.
-%%
-%% @param CusterId the cluster id.
-%% @param Servers the known servers in the cluster.
-%% @param Msg the message to enqueue.
-%%
-%% @returns `ok'
--spec untracked_enqueue([ra:server_id()], term()) ->
- ok.
-untracked_enqueue([Node | _], Msg) ->
- Cmd = rabbit_fifo:make_enqueue(undefined, undefined, Msg),
- ok = ra:pipeline_command(Node, Cmd),
- ok.
-
-%% Internal
-
-try_process_command([Server | Rem], Cmd, State) ->
- case ra:process_command(Server, Cmd, 30000) of
- {ok, _, Leader} ->
- {ok, State#state{leader = Leader}};
- Err when length(Rem) =:= 0 ->
- Err;
- _ ->
- try_process_command(Rem, Cmd, State)
- end.
-
-seq_applied({Seq, MaybeAction},
- {Corrs, Actions0, #state{last_applied = Last} = State0})
- when Seq > Last ->
- State1 = do_resends(Last+1, Seq-1, State0),
- {Actions, State} = maybe_add_action(MaybeAction, Actions0, State1),
- case maps:take(Seq, State#state.pending) of
- {{undefined, _}, Pending} ->
- {Corrs, Actions, State#state{pending = Pending,
- last_applied = Seq}};
- {{Corr, _}, Pending} ->
- {[Corr | Corrs], Actions, State#state{pending = Pending,
- last_applied = Seq}};
- error ->
- % must have already been resent or removed for some other reason
- % still need to update last_applied or we may inadvertently resend
- % stuff later
- {Corrs, Actions, State#state{last_applied = Seq}}
- end;
-seq_applied(_Seq, Acc) ->
- Acc.
-
-maybe_add_action(ok, Acc, State) ->
- {Acc, State};
-maybe_add_action({multi, Actions}, Acc0, State0) ->
- lists:foldl(fun (Act, {Acc, State}) ->
- maybe_add_action(Act, Acc, State)
- end, {Acc0, State0}, Actions);
-maybe_add_action({send_drained, {Tag, Credit}} = Action, Acc,
- #state{consumer_deliveries = CDels} = State) ->
- %% add credit to consumer delivery_count
- C = maps:get(Tag, CDels),
- {[Action | Acc],
- State#state{consumer_deliveries =
- update_consumer(Tag, C#consumer.last_msg_id,
- Credit, C, CDels)}};
-maybe_add_action(Action, Acc, State) ->
- %% anything else is assumed to be an action
- {[Action | Acc], State}.
-
-do_resends(From, To, State) when From =< To ->
- % ?INFO("rabbit_fifo_client: doing resends From ~w To ~w~n", [From, To]),
- lists:foldl(fun resend/2, State, lists:seq(From, To));
-do_resends(_, _, State) ->
- State.
-
-% resends a command with a new sequence number
-resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) ->
- case maps:take(OldSeq, Pending0) of
- {{Corr, Cmd}, Pending} ->
- %% resends aren't subject to flow control here
- resend_command(Leader, Corr, Cmd, State#state{pending = Pending});
- error ->
- State
- end.
-
-resend_all_pending(#state{pending = Pend} = State) ->
- Seqs = lists:sort(maps:keys(Pend)),
- lists:foldl(fun resend/2, State, Seqs).
-
-maybe_auto_ack(true, Deliver, State0) ->
- %% manual ack is enabled
- {ok, State0, [Deliver]};
-maybe_auto_ack(false, {deliver, Tag, _Ack, Msgs} = Deliver, State0) ->
- %% we have to auto ack these deliveries
- MsgIds = [I || {_, _, I, _, _} <- Msgs],
- {State, Actions} = settle(Tag, MsgIds, State0),
- {ok, State, [Deliver] ++ Actions}.
-
-
-handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs},
- #state{cfg = #cfg{cluster_name = QName},
- consumer_deliveries = CDels0} = State0) ->
- QRef = qref(Leader),
- {LastId, _} = lists:last(IdMsgs),
- Consumer = #consumer{ack = Ack} = maps:get(Tag, CDels0),
- %% format as a deliver action
- Del = {deliver, Tag, Ack, transform_msgs(QName, QRef, IdMsgs)},
- %% TODO: remove potential default allocation
- case Consumer of
- #consumer{last_msg_id = Prev} = C
- when FstId =:= Prev+1 ->
- maybe_auto_ack(Ack, Del,
- State0#state{consumer_deliveries =
- update_consumer(Tag, LastId,
- length(IdMsgs), C,
- CDels0)});
- #consumer{last_msg_id = Prev} = C
- when FstId > Prev+1 ->
- NumMissing = FstId - Prev + 1,
- %% there may actually be fewer missing messages returned than expected
- %% This can happen when a node the channel is on gets disconnected
- %% from the node the leader is on and then reconnected afterwards.
- %% When the node is disconnected the leader will return all checked
- %% out messages to the main queue to ensure they don't get stuck in
- %% case the node never comes back.
- case get_missing_deliveries(Leader, Prev+1, FstId-1, Tag) of
- {protocol_error, _, _, _} = Err ->
- Err;
- Missing ->
- XDel = {deliver, Tag, Ack, transform_msgs(QName, QRef,
- Missing ++ IdMsgs)},
- maybe_auto_ack(Ack, XDel,
- State0#state{consumer_deliveries =
- update_consumer(Tag, LastId,
- length(IdMsgs) + NumMissing,
- C, CDels0)})
- end;
- #consumer{last_msg_id = Prev}
- when FstId =< Prev ->
- case lists:dropwhile(fun({Id, _}) -> Id =< Prev end, IdMsgs) of
- [] ->
- {ok, State0, []};
- IdMsgs2 ->
- handle_delivery(Leader, {delivery, Tag, IdMsgs2}, State0)
- end;
- C when FstId =:= 0 ->
- % the very first delivery
- maybe_auto_ack(Ack, Del,
- State0#state{consumer_deliveries =
- update_consumer(Tag, LastId,
- length(IdMsgs),
- C#consumer{last_msg_id = LastId},
- CDels0)})
- end.
-
-transform_msgs(QName, QRef, Msgs) ->
- lists:map(
- fun({MsgId, {MsgHeader, Msg0}}) ->
- {Msg, Redelivered} = case MsgHeader of
- #{delivery_count := C} ->
- {add_delivery_count_header(Msg0, C), true};
- _ ->
- {Msg0, false}
- end,
- {QName, QRef, MsgId, Redelivered, Msg}
- end, Msgs).
-
-update_consumer(Tag, LastId, DelCntIncr,
- #consumer{delivery_count = D} = C, Consumers) ->
- maps:put(Tag,
- C#consumer{last_msg_id = LastId,
- delivery_count = D + DelCntIncr},
- Consumers).
-
-
-get_missing_deliveries(Leader, From, To, ConsumerTag) ->
- ConsumerId = consumer_id(ConsumerTag),
- % ?INFO("get_missing_deliveries for ~w from ~b to ~b",
- % [ConsumerId, From, To]),
- Query = fun (State) ->
- rabbit_fifo:get_checked_out(ConsumerId, From, To, State)
- end,
- case ra:local_query(Leader, Query) of
- {ok, {_, Missing}, _} ->
- Missing;
- {error, Error} ->
- {protocol_error, internal_error, "Cannot query missing deliveries from ~p: ~p",
- [Leader, Error]};
- {timeout, _} ->
- {protocol_error, internal_error, "Cannot query missing deliveries from ~p: timeout",
- [Leader]}
- end.
-
-pick_server(#state{leader = undefined,
- cfg = #cfg{servers = [N | _]}}) ->
- %% TODO: pick random rather that first?
- N;
-pick_server(#state{leader = Leader}) ->
- Leader.
-
-% servers sorted by last known leader
-sorted_servers(#state{leader = undefined,
- cfg = #cfg{servers = Servers}}) ->
- Servers;
-sorted_servers(#state{leader = Leader,
- cfg = #cfg{servers = Servers}}) ->
- [Leader | lists:delete(Leader, Servers)].
-
-next_seq(#state{next_seq = Seq} = State) ->
- {Seq, State#state{next_seq = Seq + 1}}.
-
-next_enqueue_seq(#state{next_enqueue_seq = Seq} = State) ->
- {Seq, State#state{next_enqueue_seq = Seq + 1}}.
-
-consumer_id(ConsumerTag) ->
- {ConsumerTag, self()}.
-
-send_command(Server, Correlation, Command, Priority,
- #state{pending = Pending,
- cfg = #cfg{soft_limit = SftLmt}} = State0) ->
- {Seq, State} = next_seq(State0),
- ok = ra:pipeline_command(Server, Command, Seq, Priority),
- Tag = case maps:size(Pending) >= SftLmt of
- true -> slow;
- false -> ok
- end,
- {Tag, State#state{pending = Pending#{Seq => {Correlation, Command}},
- slow = Tag == slow}}.
-
-resend_command(Node, Correlation, Command,
- #state{pending = Pending} = State0) ->
- {Seq, State} = next_seq(State0),
- ok = ra:pipeline_command(Node, Command, Seq),
- State#state{pending = Pending#{Seq => {Correlation, Command}}}.
-
-add_command(_, _, [], Acc) ->
- Acc;
-add_command(Cid, settle, MsgIds, Acc) ->
- [rabbit_fifo:make_settle(Cid, MsgIds) | Acc];
-add_command(Cid, return, MsgIds, Acc) ->
- [rabbit_fifo:make_return(Cid, MsgIds) | Acc];
-add_command(Cid, discard, MsgIds, Acc) ->
- [rabbit_fifo:make_discard(Cid, MsgIds) | Acc].
-
-set_timer(#state{leader = Leader0,
- cfg = #cfg{servers = [Server | _],
- cluster_name = QName}} = State) ->
- Leader = case Leader0 of
- undefined -> Server;
- _ ->
- Leader0
- end,
- Ref = erlang:send_after(?TIMER_TIME, self(),
- {'$gen_cast',
- {queue_event, QName, {Leader, timeout}}}),
- State#state{timer_state = Ref}.
-
-cancel_timer(#state{timer_state = undefined} = State) ->
- State;
-cancel_timer(#state{timer_state = Ref} = State) ->
- erlang:cancel_timer(Ref, [{async, true}, {info, false}]),
- State#state{timer_state = undefined}.
-
-find_leader([]) ->
- undefined;
-find_leader([Server | Servers]) ->
- case ra:members(Server, 500) of
- {ok, _, Leader} -> Leader;
- _ ->
- find_leader(Servers)
- end.
-
-qref({Ref, _}) -> Ref;
-qref(Ref) -> Ref.
-
-get_credit_mode(#{args := Args}) ->
- case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
- {_Key, Value} ->
- Value;
- _ ->
- simple_prefetch
- end;
-get_credit_mode(_) ->
- simple_prefetch.