diff options
Diffstat (limited to 'src/rabbit_fifo_client.erl')
-rw-r--r-- | src/rabbit_fifo_client.erl | 920 |
1 files changed, 0 insertions, 920 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl deleted file mode 100644 index 6673cadc93..0000000000 --- a/src/rabbit_fifo_client.erl +++ /dev/null @@ -1,920 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. -%% - -%% @doc Provides an easy to consume API for interacting with the {@link rabbit_fifo.} -%% state machine implementation running inside a `ra' raft system. -%% -%% Handles command tracking and other non-functional concerns. --module(rabbit_fifo_client). - --export([ - init/2, - init/3, - init/5, - checkout/4, - checkout/5, - cancel_checkout/2, - enqueue/2, - enqueue/3, - dequeue/3, - settle/3, - return/3, - discard/3, - credit/4, - handle_ra_event/3, - untracked_enqueue/2, - purge/1, - cluster_name/1, - update_machine_state/2, - pending_size/1, - stat/1, - stat/2 - ]). - --include_lib("rabbit_common/include/rabbit.hrl"). - --define(SOFT_LIMIT, 32). --define(TIMER_TIME, 10000). - --type seq() :: non_neg_integer(). -%% last_applied is initialised to -1 --type maybe_seq() :: integer(). --type action() :: {send_credit_reply, Available :: non_neg_integer()} | - {send_drained, CTagCredit :: - {rabbit_fifo:consumer_tag(), non_neg_integer()}}. --type actions() :: [action()]. - --type cluster_name() :: rabbit_types:r(queue). - --record(consumer, {last_msg_id :: seq() | -1, - ack = false :: boolean(), - delivery_count = 0 :: non_neg_integer()}). - --record(cfg, {cluster_name :: cluster_name(), - servers = [] :: [ra:server_id()], - soft_limit = ?SOFT_LIMIT :: non_neg_integer(), - block_handler = fun() -> ok end :: fun(() -> term()), - unblock_handler = fun() -> ok end :: fun(() -> ok), - timeout :: non_neg_integer(), - version = 0 :: non_neg_integer()}). - --record(state, {cfg :: #cfg{}, - leader :: undefined | ra:server_id(), - queue_status :: undefined | go | reject_publish, - next_seq = 0 :: seq(), - %% Last applied is initialise to -1 to note that no command has yet been - %% applied, but allowing to resend messages if the first ones on the sequence - %% are lost (messages are sent from last_applied + 1) - last_applied = -1 :: maybe_seq(), - next_enqueue_seq = 1 :: seq(), - %% indicates that we've exceeded the soft limit - slow = false :: boolean(), - unsent_commands = #{} :: #{rabbit_fifo:consumer_id() => - {[seq()], [seq()], [seq()]}}, - pending = #{} :: #{seq() => - {term(), rabbit_fifo:command()}}, - consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() => - #consumer{}}, - timer_state :: term() - }). - --opaque state() :: #state{}. - --export_type([ - state/0, - actions/0 - ]). - - -%% @doc Create the initial state for a new rabbit_fifo sessions. A state is needed -%% to interact with a rabbit_fifo queue using @module. -%% @param ClusterName the id of the cluster to interact with -%% @param Servers The known servers of the queue. If the current leader is known -%% ensure the leader node is at the head of the list. --spec init(cluster_name(), [ra:server_id()]) -> state(). -init(ClusterName, Servers) -> - init(ClusterName, Servers, ?SOFT_LIMIT). - -%% @doc Create the initial state for a new rabbit_fifo sessions. A state is needed -%% to interact with a rabbit_fifo queue using @module. -%% @param ClusterName the id of the cluster to interact with -%% @param Servers The known servers of the queue. If the current leader is known -%% ensure the leader node is at the head of the list. -%% @param MaxPending size defining the max number of pending commands. --spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state(). -init(ClusterName = #resource{}, Servers, SoftLimit) -> - Timeout = application:get_env(kernel, net_ticktime, 60) + 5, - #state{cfg = #cfg{cluster_name = ClusterName, - servers = Servers, - soft_limit = SoftLimit, - timeout = Timeout * 1000}}. - --spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok), - fun(() -> ok)) -> state(). -init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> - %% net ticktime is in seconds - Timeout = application:get_env(kernel, net_ticktime, 60) + 5, - #state{cfg = #cfg{cluster_name = ClusterName, - servers = Servers, - block_handler = BlockFun, - unblock_handler = UnblockFun, - soft_limit = SoftLimit, - timeout = Timeout * 1000}}. - - -%% @doc Enqueues a message. -%% @param Correlation an arbitrary erlang term used to correlate this -%% command when it has been applied. -%% @param Msg an arbitrary erlang term representing the message. -%% @param State the current {@module} state. -%% @returns -%% `{ok | slow, State}' if the command was successfully sent. If the return -%% tag is `slow' it means the limit is approaching and it is time to slow down -%% the sending rate. -%% {@module} assigns a sequence number to every raft command it issues. The -%% SequenceNumber can be correlated to the applied sequence numbers returned -%% by the {@link handle_ra_event/2. handle_ra_event/2} function. --spec enqueue(Correlation :: term(), Msg :: term(), State :: state()) -> - {ok | slow | reject_publish, state()}. -enqueue(Correlation, Msg, - #state{queue_status = undefined, - next_enqueue_seq = 1, - cfg = #cfg{timeout = Timeout}} = State0) -> - %% it is the first enqueue, check the version - {_, Node} = Server = pick_server(State0), - case rpc:call(Node, ra_machine, version, [{machine, rabbit_fifo, #{}}]) of - 0 -> - %% the leader is running the old version - %% so we can't initialize the enqueuer session safely - %% fall back on old behavour - enqueue(Correlation, Msg, State0#state{queue_status = go}); - 1 -> - %% were running the new version on the leader do sync initialisation - %% of enqueuer session - Reg = rabbit_fifo:make_register_enqueuer(self()), - case ra:process_command(Server, Reg, Timeout) of - {ok, reject_publish, _} -> - {reject_publish, State0#state{queue_status = reject_publish}}; - {ok, ok, _} -> - enqueue(Correlation, Msg, State0#state{queue_status = go}); - {timeout, _} -> - %% if we timeout it is probably better to reject - %% the message than being uncertain - {reject_publish, State0}; - Err -> - exit(Err) - end; - {badrpc, nodedown} -> - {reject_publish, State0} - end; -enqueue(_Correlation, _Msg, - #state{queue_status = reject_publish, - cfg = #cfg{}} = State) -> - {reject_publish, State}; -enqueue(Correlation, Msg, - #state{slow = Slow, - queue_status = go, - cfg = #cfg{block_handler = BlockFun}} = State0) -> - Node = pick_server(State0), - {Next, State1} = next_enqueue_seq(State0), - % by default there is no correlation id - Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg), - case send_command(Node, Correlation, Cmd, low, State1) of - {slow, State} when not Slow -> - BlockFun(), - {slow, set_timer(State)}; - Any -> - Any - end. - -%% @doc Enqueues a message. -%% @param Msg an arbitrary erlang term representing the message. -%% @param State the current {@module} state. -%% @returns -%% `{ok | slow, State}' if the command was successfully sent. If the return -%% tag is `slow' it means the limit is approaching and it is time to slow down -%% the sending rate. -%% {@module} assigns a sequence number to every raft command it issues. The -%% SequenceNumber can be correlated to the applied sequence numbers returned -%% by the {@link handle_ra_event/2. handle_ra_event/2} function. -%% --spec enqueue(Msg :: term(), State :: state()) -> - {ok | slow | reject_publish, state()}. -enqueue(Msg, State) -> - enqueue(undefined, Msg, State). - -%% @doc Dequeue a message from the queue. -%% -%% This is a synchronous call. I.e. the call will block until the command -%% has been accepted by the ra process or it times out. -%% -%% @param ConsumerTag a unique tag to identify this particular consumer. -%% @param Settlement either `settled' or `unsettled'. When `settled' no -%% further settlement needs to be done. -%% @param State The {@module} state. -%% -%% @returns `{ok, IdMsg, State}' or `{error | timeout, term()}' --spec dequeue(rabbit_fifo:consumer_tag(), - Settlement :: settled | unsettled, state()) -> - {ok, non_neg_integer(), term(), non_neg_integer()} - | {empty, state()} | {error | timeout, term()}. -dequeue(ConsumerTag, Settlement, - #state{cfg = #cfg{timeout = Timeout, - cluster_name = QName}} = State0) -> - Node = pick_server(State0), - ConsumerId = consumer_id(ConsumerTag), - case ra:process_command(Node, - rabbit_fifo:make_checkout(ConsumerId, - {dequeue, Settlement}, - #{}), - Timeout) of - {ok, {dequeue, empty}, Leader} -> - {empty, State0#state{leader = Leader}}; - {ok, {dequeue, {MsgId, {MsgHeader, Msg0}}, MsgsReady}, Leader} -> - Count = case MsgHeader of - #{delivery_count := C} -> C; - _ -> 0 - end, - IsDelivered = Count > 0, - Msg = add_delivery_count_header(Msg0, Count), - {ok, MsgsReady, - {QName, qref(Leader), MsgId, IsDelivered, Msg}, - State0#state{leader = Leader}}; - {ok, {error, _} = Err, _Leader} -> - Err; - Err -> - Err - end. - -add_delivery_count_header(#basic_message{} = Msg0, Count) - when is_integer(Count) -> - rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0); -add_delivery_count_header(Msg, _Count) -> - Msg. - - -%% @doc Settle a message. Permanently removes message from the queue. -%% @param ConsumerTag the tag uniquely identifying the consumer. -%% @param MsgIds the message ids received with the {@link rabbit_fifo:delivery/0.} -%% @param State the {@module} state -%% @returns -%% `{ok | slow, State}' if the command was successfully sent. If the return -%% tag is `slow' it means the limit is approaching and it is time to slow down -%% the sending rate. -%% --spec settle(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> - {state(), list()}. -settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> - Node = pick_server(State0), - Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds), - case send_command(Node, undefined, Cmd, normal, State0) of - {_, S} -> - % turn slow into ok for this function - {S, []} - end; -settle(ConsumerTag, [_|_] = MsgIds, - #state{unsent_commands = Unsent0} = State0) -> - ConsumerId = consumer_id(ConsumerTag), - %% we've reached the soft limit so will stash the command to be - %% sent once we have seen enough notifications - Unsent = maps:update_with(ConsumerId, - fun ({Settles, Returns, Discards}) -> - {Settles ++ MsgIds, Returns, Discards} - end, {MsgIds, [], []}, Unsent0), - {State0#state{unsent_commands = Unsent}, []}. - -%% @doc Return a message to the queue. -%% @param ConsumerTag the tag uniquely identifying the consumer. -%% @param MsgIds the message ids to return received -%% from {@link rabbit_fifo:delivery/0.} -%% @param State the {@module} state -%% @returns -%% `{ok | slow, State}' if the command was successfully sent. If the return -%% tag is `slow' it means the limit is approaching and it is time to slow down -%% the sending rate. -%% --spec return(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> - {state(), list()}. -return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> - Node = pick_server(State0), - % TODO: make rabbit_fifo return support lists of message ids - Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds), - case send_command(Node, undefined, Cmd, normal, State0) of - {_, S} -> - {S, []} - end; -return(ConsumerTag, [_|_] = MsgIds, - #state{unsent_commands = Unsent0} = State0) -> - ConsumerId = consumer_id(ConsumerTag), - %% we've reached the soft limit so will stash the command to be - %% sent once we have seen enough notifications - Unsent = maps:update_with(ConsumerId, - fun ({Settles, Returns, Discards}) -> - {Settles, Returns ++ MsgIds, Discards} - end, {[], MsgIds, []}, Unsent0), - {State0#state{unsent_commands = Unsent}, []}. - -%% @doc Discards a checked out message. -%% If the queue has a dead_letter_handler configured this will be called. -%% @param ConsumerTag the tag uniquely identifying the consumer. -%% @param MsgIds the message ids to discard -%% from {@link rabbit_fifo:delivery/0.} -%% @param State the {@module} state -%% @returns -%% `{ok | slow, State}' if the command was successfully sent. If the return -%% tag is `slow' it means the limit is approaching and it is time to slow down -%% the sending rate. --spec discard(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> - {state(), list()}. -discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> - Node = pick_server(State0), - Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds), - case send_command(Node, undefined, Cmd, normal, State0) of - {_, S} -> - % turn slow into ok for this function - {S, []} - end; -discard(ConsumerTag, [_|_] = MsgIds, - #state{unsent_commands = Unsent0} = State0) -> - ConsumerId = consumer_id(ConsumerTag), - %% we've reached the soft limit so will stash the command to be - %% sent once we have seen enough notifications - Unsent = maps:update_with(ConsumerId, - fun ({Settles, Returns, Discards}) -> - {Settles, Returns, Discards ++ MsgIds} - end, {[], [], MsgIds}, Unsent0), - {State0#state{unsent_commands = Unsent}, []}. - - -%% @doc Register with the rabbit_fifo queue to "checkout" messages as they -%% become available. -%% -%% This is a synchronous call. I.e. the call will block until the command -%% has been accepted by the ra process or it times out. -%% -%% @param ConsumerTag a unique tag to identify this particular consumer. -%% @param NumUnsettled the maximum number of in-flight messages. Once this -%% number of messages has been received but not settled no further messages -%% will be delivered to the consumer. -%% @param State The {@module} state. -%% -%% @returns `{ok, State}' or `{error | timeout, term()}' --spec checkout(rabbit_fifo:consumer_tag(), NumUnsettled :: non_neg_integer(), - rabbit_fifo:consumer_meta(), - state()) -> {ok, state()} | {error | timeout, term()}. -checkout(ConsumerTag, NumUnsettled, ConsumerInfo, State0) - when is_map(ConsumerInfo) -> - checkout(ConsumerTag, NumUnsettled, get_credit_mode(ConsumerInfo), ConsumerInfo, State0). - -%% @doc Register with the rabbit_fifo queue to "checkout" messages as they -%% become available. -%% -%% This is a synchronous call. I.e. the call will block until the command -%% has been accepted by the ra process or it times out. -%% -%% @param ConsumerTag a unique tag to identify this particular consumer. -%% @param NumUnsettled the maximum number of in-flight messages. Once this -%% number of messages has been received but not settled no further messages -%% will be delivered to the consumer. -%% @param CreditMode The credit mode to use for the checkout. -%% simple_prefetch: credit is auto topped up as deliveries are settled -%% credited: credit is only increased by sending credit to the queue -%% @param State The {@module} state. -%% -%% @returns `{ok, State}' or `{error | timeout, term()}' --spec checkout(rabbit_fifo:consumer_tag(), - NumUnsettled :: non_neg_integer(), - CreditMode :: rabbit_fifo:credit_mode(), - Meta :: rabbit_fifo:consumer_meta(), - state()) -> {ok, state()} | {error | timeout, term()}. -checkout(ConsumerTag, NumUnsettled, CreditMode, Meta, - #state{consumer_deliveries = CDels0} = State0) -> - Servers = sorted_servers(State0), - ConsumerId = {ConsumerTag, self()}, - Cmd = rabbit_fifo:make_checkout(ConsumerId, - {auto, NumUnsettled, CreditMode}, - Meta), - %% ??? - Ack = maps:get(ack, Meta, true), - - SDels = maps:update_with(ConsumerTag, - fun (V) -> - V#consumer{ack = Ack} - end, - #consumer{last_msg_id = -1, - ack = Ack}, CDels0), - try_process_command(Servers, Cmd, State0#state{consumer_deliveries = SDels}). - -%% @doc Provide credit to the queue -%% -%% This only has an effect if the consumer uses credit mode: credited -%% @param ConsumerTag a unique tag to identify this particular consumer. -%% @param Credit the amount of credit to provide to theq queue -%% @param Drain tells the queue to use up any credit that cannot be immediately -%% fulfilled. (i.e. there are not enough messages on queue to use up all the -%% provided credit). --spec credit(rabbit_fifo:consumer_tag(), - Credit :: non_neg_integer(), - Drain :: boolean(), - state()) -> - {state(), actions()}. -credit(ConsumerTag, Credit, Drain, - #state{consumer_deliveries = CDels} = State0) -> - ConsumerId = consumer_id(ConsumerTag), - %% the last received msgid provides us with the delivery count if we - %% add one as it is 0 indexed - C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}), - Node = pick_server(State0), - Cmd = rabbit_fifo:make_credit(ConsumerId, Credit, - C#consumer.last_msg_id + 1, Drain), - case send_command(Node, undefined, Cmd, normal, State0) of - {_, S} -> - % turn slow into ok for this function - {S, []} - end. - -%% @doc Cancels a checkout with the rabbit_fifo queue for the consumer tag -%% -%% This is a synchronous call. I.e. the call will block until the command -%% has been accepted by the ra process or it times out. -%% -%% @param ConsumerTag a unique tag to identify this particular consumer. -%% @param State The {@module} state. -%% -%% @returns `{ok, State}' or `{error | timeout, term()}' --spec cancel_checkout(rabbit_fifo:consumer_tag(), state()) -> - {ok, state()} | {error | timeout, term()}. -cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) -> - Servers = sorted_servers(State0), - ConsumerId = {ConsumerTag, self()}, - Cmd = rabbit_fifo:make_checkout(ConsumerId, cancel, #{}), - State = State0#state{consumer_deliveries = maps:remove(ConsumerTag, CDels)}, - try_process_command(Servers, Cmd, State). - -%% @doc Purges all the messages from a rabbit_fifo queue and returns the number -%% of messages purged. --spec purge(ra:server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}. -purge(Node) -> - case ra:process_command(Node, rabbit_fifo:make_purge()) of - {ok, {purge, Reply}, _} -> - {ok, Reply}; - Err -> - Err - end. - --spec pending_size(state()) -> non_neg_integer(). -pending_size(#state{pending = Pend}) -> - maps:size(Pend). - --spec stat(ra:server_id()) -> - {ok, non_neg_integer(), non_neg_integer()} - | {error | timeout, term()}. -stat(Leader) -> - %% short timeout as we don't want to spend too long if it is going to - %% fail anyway - stat(Leader, 250). - --spec stat(ra:server_id(), non_neg_integer()) -> - {ok, non_neg_integer(), non_neg_integer()} - | {error | timeout, term()}. -stat(Leader, Timeout) -> - %% short timeout as we don't want to spend too long if it is going to - %% fail anyway - case ra:local_query(Leader, fun rabbit_fifo:query_stat/1, Timeout) of - {ok, {_, {R, C}}, _} -> {ok, R, C}; - {error, _} = Error -> Error; - {timeout, _} = Error -> Error - end. - -%% @doc returns the cluster name --spec cluster_name(state()) -> cluster_name(). -cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) -> - ClusterName. - -update_machine_state(Server, Conf) -> - case ra:process_command(Server, rabbit_fifo:make_update_config(Conf)) of - {ok, ok, _} -> - ok; - Err -> - Err - end. - -%% @doc Handles incoming `ra_events'. Events carry both internal "bookeeping" -%% events emitted by the `ra' leader as well as `rabbit_fifo' emitted events such -%% as message deliveries. All ra events need to be handled by {@module} -%% to ensure bookeeping, resends and flow control is correctly handled. -%% -%% If the `ra_event' contains a `rabbit_fifo' generated message it will be returned -%% for further processing. -%% -%% Example: -%% -%% ``` -%% receive -%% {ra_event, From, Evt} -> -%% case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of -%% {internal, _Seq, State} -> State; -%% {{delivery, _ConsumerTag, Msgs}, State} -> -%% handle_messages(Msgs), -%% ... -%% end -%% end -%% ''' -%% -%% @param From the {@link ra:server_id().} of the sending process. -%% @param Event the body of the `ra_event'. -%% @param State the current {@module} state. -%% -%% @returns -%% `{internal, AppliedCorrelations, State}' if the event contained an internally -%% handled event such as a notification and a correlation was included with -%% the command (e.g. in a call to `enqueue/3' the correlation terms are returned -%% here. -%% -%% `{RaFifoEvent, State}' if the event contained a client message generated by -%% the `rabbit_fifo' state machine such as a delivery. -%% -%% The type of `rabbit_fifo' client messages that can be received are: -%% -%% `{delivery, ConsumerTag, [{MsgId, {MsgHeader, Msg}}]}' -%% -%% <li>`ConsumerTag' the binary tag passed to {@link checkout/3.}</li> -%% <li>`MsgId' is a consumer scoped monotonically incrementing id that can be -%% used to {@link settle/3.} (roughly: AMQP 0.9.1 ack) message once finished -%% with them.</li> --spec handle_ra_event(ra:server_id(), ra_server_proc:ra_event_body(), state()) -> - {internal, Correlators :: [term()], actions(), state()} | - {rabbit_fifo:client_msg(), state()} | eol. -handle_ra_event(From, {applied, Seqs}, - #state{cfg = #cfg{cluster_name = QRef, - soft_limit = SftLmt, - unblock_handler = UnblockFun}} = State0) -> - - {Corrs, Actions0, State1} = lists:foldl(fun seq_applied/2, - {[], [], State0#state{leader = From}}, - Seqs), - Actions = case Corrs of - [] -> - lists:reverse(Actions0); - _ -> - [{settled, QRef, Corrs} - | lists:reverse(Actions0)] - end, - case maps:size(State1#state.pending) < SftLmt of - true when State1#state.slow == true -> - % we have exited soft limit state - % send any unsent commands and cancel the time as - % TODO: really the timer should only be cancelled when the channel - % exits flow state (which depends on the state of all queues the - % channel is interacting with) - % but the fact the queue has just applied suggests - % it's ok to cancel here anyway - State2 = cancel_timer(State1#state{slow = false, - unsent_commands = #{}}), - % build up a list of commands to issue - Commands = maps:fold( - fun (Cid, {Settled, Returns, Discards}, Acc) -> - add_command(Cid, settle, Settled, - add_command(Cid, return, Returns, - add_command(Cid, discard, - Discards, Acc))) - end, [], State1#state.unsent_commands), - Node = pick_server(State2), - %% send all the settlements and returns - State = lists:foldl(fun (C, S0) -> - case send_command(Node, undefined, - C, normal, S0) of - {T, S} when T =/= error -> - S - end - end, State2, Commands), - UnblockFun(), - {ok, State, Actions}; - _ -> - {ok, State1, Actions} - end; -handle_ra_event(From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) -> - handle_delivery(From, Del, State0); -handle_ra_event(_, {machine, {queue_status, Status}}, - #state{} = State) -> - %% just set the queue status - {ok, State#state{queue_status = Status}, []}; -handle_ra_event(Leader, {machine, leader_change}, - #state{leader = Leader} = State) -> - %% leader already known - {ok, State, []}; -handle_ra_event(Leader, {machine, leader_change}, State0) -> - %% we need to update leader - %% and resend any pending commands - State = resend_all_pending(State0#state{leader = Leader}), - {ok, State, []}; -handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) -> - % TODO: how should these be handled? re-sent on timer or try random - {ok, State0, []}; -handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) -> - State1 = State0#state{leader = Leader}, - State = resend(Seq, State1), - {ok, State, []}; -handle_ra_event(_, timeout, #state{cfg = #cfg{servers = Servers}} = State0) -> - case find_leader(Servers) of - undefined -> - %% still no leader, set the timer again - {ok, set_timer(State0), []}; - Leader -> - State = resend_all_pending(State0#state{leader = Leader}), - {ok, State, []} - end; -handle_ra_event(_Leader, {machine, eol}, _State0) -> - eol. - -%% @doc Attempts to enqueue a message using cast semantics. This provides no -%% guarantees or retries if the message fails to achieve consensus or if the -%% servers sent to happens not to be available. If the message is sent to a -%% follower it will attempt the deliver it to the leader, if known. Else it will -%% drop the messages. -%% -%% NB: only use this for non-critical enqueues where a full rabbit_fifo_client state -%% cannot be maintained. -%% -%% @param CusterId the cluster id. -%% @param Servers the known servers in the cluster. -%% @param Msg the message to enqueue. -%% -%% @returns `ok' --spec untracked_enqueue([ra:server_id()], term()) -> - ok. -untracked_enqueue([Node | _], Msg) -> - Cmd = rabbit_fifo:make_enqueue(undefined, undefined, Msg), - ok = ra:pipeline_command(Node, Cmd), - ok. - -%% Internal - -try_process_command([Server | Rem], Cmd, State) -> - case ra:process_command(Server, Cmd, 30000) of - {ok, _, Leader} -> - {ok, State#state{leader = Leader}}; - Err when length(Rem) =:= 0 -> - Err; - _ -> - try_process_command(Rem, Cmd, State) - end. - -seq_applied({Seq, MaybeAction}, - {Corrs, Actions0, #state{last_applied = Last} = State0}) - when Seq > Last -> - State1 = do_resends(Last+1, Seq-1, State0), - {Actions, State} = maybe_add_action(MaybeAction, Actions0, State1), - case maps:take(Seq, State#state.pending) of - {{undefined, _}, Pending} -> - {Corrs, Actions, State#state{pending = Pending, - last_applied = Seq}}; - {{Corr, _}, Pending} -> - {[Corr | Corrs], Actions, State#state{pending = Pending, - last_applied = Seq}}; - error -> - % must have already been resent or removed for some other reason - % still need to update last_applied or we may inadvertently resend - % stuff later - {Corrs, Actions, State#state{last_applied = Seq}} - end; -seq_applied(_Seq, Acc) -> - Acc. - -maybe_add_action(ok, Acc, State) -> - {Acc, State}; -maybe_add_action({multi, Actions}, Acc0, State0) -> - lists:foldl(fun (Act, {Acc, State}) -> - maybe_add_action(Act, Acc, State) - end, {Acc0, State0}, Actions); -maybe_add_action({send_drained, {Tag, Credit}} = Action, Acc, - #state{consumer_deliveries = CDels} = State) -> - %% add credit to consumer delivery_count - C = maps:get(Tag, CDels), - {[Action | Acc], - State#state{consumer_deliveries = - update_consumer(Tag, C#consumer.last_msg_id, - Credit, C, CDels)}}; -maybe_add_action(Action, Acc, State) -> - %% anything else is assumed to be an action - {[Action | Acc], State}. - -do_resends(From, To, State) when From =< To -> - % ?INFO("rabbit_fifo_client: doing resends From ~w To ~w~n", [From, To]), - lists:foldl(fun resend/2, State, lists:seq(From, To)); -do_resends(_, _, State) -> - State. - -% resends a command with a new sequence number -resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) -> - case maps:take(OldSeq, Pending0) of - {{Corr, Cmd}, Pending} -> - %% resends aren't subject to flow control here - resend_command(Leader, Corr, Cmd, State#state{pending = Pending}); - error -> - State - end. - -resend_all_pending(#state{pending = Pend} = State) -> - Seqs = lists:sort(maps:keys(Pend)), - lists:foldl(fun resend/2, State, Seqs). - -maybe_auto_ack(true, Deliver, State0) -> - %% manual ack is enabled - {ok, State0, [Deliver]}; -maybe_auto_ack(false, {deliver, Tag, _Ack, Msgs} = Deliver, State0) -> - %% we have to auto ack these deliveries - MsgIds = [I || {_, _, I, _, _} <- Msgs], - {State, Actions} = settle(Tag, MsgIds, State0), - {ok, State, [Deliver] ++ Actions}. - - -handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs}, - #state{cfg = #cfg{cluster_name = QName}, - consumer_deliveries = CDels0} = State0) -> - QRef = qref(Leader), - {LastId, _} = lists:last(IdMsgs), - Consumer = #consumer{ack = Ack} = maps:get(Tag, CDels0), - %% format as a deliver action - Del = {deliver, Tag, Ack, transform_msgs(QName, QRef, IdMsgs)}, - %% TODO: remove potential default allocation - case Consumer of - #consumer{last_msg_id = Prev} = C - when FstId =:= Prev+1 -> - maybe_auto_ack(Ack, Del, - State0#state{consumer_deliveries = - update_consumer(Tag, LastId, - length(IdMsgs), C, - CDels0)}); - #consumer{last_msg_id = Prev} = C - when FstId > Prev+1 -> - NumMissing = FstId - Prev + 1, - %% there may actually be fewer missing messages returned than expected - %% This can happen when a node the channel is on gets disconnected - %% from the node the leader is on and then reconnected afterwards. - %% When the node is disconnected the leader will return all checked - %% out messages to the main queue to ensure they don't get stuck in - %% case the node never comes back. - case get_missing_deliveries(Leader, Prev+1, FstId-1, Tag) of - {protocol_error, _, _, _} = Err -> - Err; - Missing -> - XDel = {deliver, Tag, Ack, transform_msgs(QName, QRef, - Missing ++ IdMsgs)}, - maybe_auto_ack(Ack, XDel, - State0#state{consumer_deliveries = - update_consumer(Tag, LastId, - length(IdMsgs) + NumMissing, - C, CDels0)}) - end; - #consumer{last_msg_id = Prev} - when FstId =< Prev -> - case lists:dropwhile(fun({Id, _}) -> Id =< Prev end, IdMsgs) of - [] -> - {ok, State0, []}; - IdMsgs2 -> - handle_delivery(Leader, {delivery, Tag, IdMsgs2}, State0) - end; - C when FstId =:= 0 -> - % the very first delivery - maybe_auto_ack(Ack, Del, - State0#state{consumer_deliveries = - update_consumer(Tag, LastId, - length(IdMsgs), - C#consumer{last_msg_id = LastId}, - CDels0)}) - end. - -transform_msgs(QName, QRef, Msgs) -> - lists:map( - fun({MsgId, {MsgHeader, Msg0}}) -> - {Msg, Redelivered} = case MsgHeader of - #{delivery_count := C} -> - {add_delivery_count_header(Msg0, C), true}; - _ -> - {Msg0, false} - end, - {QName, QRef, MsgId, Redelivered, Msg} - end, Msgs). - -update_consumer(Tag, LastId, DelCntIncr, - #consumer{delivery_count = D} = C, Consumers) -> - maps:put(Tag, - C#consumer{last_msg_id = LastId, - delivery_count = D + DelCntIncr}, - Consumers). - - -get_missing_deliveries(Leader, From, To, ConsumerTag) -> - ConsumerId = consumer_id(ConsumerTag), - % ?INFO("get_missing_deliveries for ~w from ~b to ~b", - % [ConsumerId, From, To]), - Query = fun (State) -> - rabbit_fifo:get_checked_out(ConsumerId, From, To, State) - end, - case ra:local_query(Leader, Query) of - {ok, {_, Missing}, _} -> - Missing; - {error, Error} -> - {protocol_error, internal_error, "Cannot query missing deliveries from ~p: ~p", - [Leader, Error]}; - {timeout, _} -> - {protocol_error, internal_error, "Cannot query missing deliveries from ~p: timeout", - [Leader]} - end. - -pick_server(#state{leader = undefined, - cfg = #cfg{servers = [N | _]}}) -> - %% TODO: pick random rather that first? - N; -pick_server(#state{leader = Leader}) -> - Leader. - -% servers sorted by last known leader -sorted_servers(#state{leader = undefined, - cfg = #cfg{servers = Servers}}) -> - Servers; -sorted_servers(#state{leader = Leader, - cfg = #cfg{servers = Servers}}) -> - [Leader | lists:delete(Leader, Servers)]. - -next_seq(#state{next_seq = Seq} = State) -> - {Seq, State#state{next_seq = Seq + 1}}. - -next_enqueue_seq(#state{next_enqueue_seq = Seq} = State) -> - {Seq, State#state{next_enqueue_seq = Seq + 1}}. - -consumer_id(ConsumerTag) -> - {ConsumerTag, self()}. - -send_command(Server, Correlation, Command, Priority, - #state{pending = Pending, - cfg = #cfg{soft_limit = SftLmt}} = State0) -> - {Seq, State} = next_seq(State0), - ok = ra:pipeline_command(Server, Command, Seq, Priority), - Tag = case maps:size(Pending) >= SftLmt of - true -> slow; - false -> ok - end, - {Tag, State#state{pending = Pending#{Seq => {Correlation, Command}}, - slow = Tag == slow}}. - -resend_command(Node, Correlation, Command, - #state{pending = Pending} = State0) -> - {Seq, State} = next_seq(State0), - ok = ra:pipeline_command(Node, Command, Seq), - State#state{pending = Pending#{Seq => {Correlation, Command}}}. - -add_command(_, _, [], Acc) -> - Acc; -add_command(Cid, settle, MsgIds, Acc) -> - [rabbit_fifo:make_settle(Cid, MsgIds) | Acc]; -add_command(Cid, return, MsgIds, Acc) -> - [rabbit_fifo:make_return(Cid, MsgIds) | Acc]; -add_command(Cid, discard, MsgIds, Acc) -> - [rabbit_fifo:make_discard(Cid, MsgIds) | Acc]. - -set_timer(#state{leader = Leader0, - cfg = #cfg{servers = [Server | _], - cluster_name = QName}} = State) -> - Leader = case Leader0 of - undefined -> Server; - _ -> - Leader0 - end, - Ref = erlang:send_after(?TIMER_TIME, self(), - {'$gen_cast', - {queue_event, QName, {Leader, timeout}}}), - State#state{timer_state = Ref}. - -cancel_timer(#state{timer_state = undefined} = State) -> - State; -cancel_timer(#state{timer_state = Ref} = State) -> - erlang:cancel_timer(Ref, [{async, true}, {info, false}]), - State#state{timer_state = undefined}. - -find_leader([]) -> - undefined; -find_leader([Server | Servers]) -> - case ra:members(Server, 500) of - {ok, _, Leader} -> Leader; - _ -> - find_leader(Servers) - end. - -qref({Ref, _}) -> Ref; -qref(Ref) -> Ref. - -get_credit_mode(#{args := Args}) -> - case rabbit_misc:table_lookup(Args, <<"x-credit">>) of - {_Key, Value} -> - Value; - _ -> - simple_prefetch - end; -get_credit_mode(_) -> - simple_prefetch. |