%% This Source Code Form is subject to the terms of the Mozilla Public %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% %% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. %% %% @doc Provides an easy to consume API for interacting with the {@link rabbit_fifo.} %% state machine implementation running inside a `ra' raft system. %% %% Handles command tracking and other non-functional concerns. -module(rabbit_fifo_client). -export([ init/2, init/3, init/5, checkout/4, checkout/5, cancel_checkout/2, enqueue/2, enqueue/3, dequeue/3, settle/3, return/3, discard/3, credit/4, handle_ra_event/3, untracked_enqueue/2, purge/1, cluster_name/1, update_machine_state/2, pending_size/1, stat/1, stat/2 ]). -include_lib("rabbit_common/include/rabbit.hrl"). -define(SOFT_LIMIT, 32). -define(TIMER_TIME, 10000). -type seq() :: non_neg_integer(). %% last_applied is initialised to -1 -type maybe_seq() :: integer(). -type action() :: {send_credit_reply, Available :: non_neg_integer()} | {send_drained, CTagCredit :: {rabbit_fifo:consumer_tag(), non_neg_integer()}}. -type actions() :: [action()]. -type cluster_name() :: rabbit_types:r(queue). -record(consumer, {last_msg_id :: seq() | -1, ack = false :: boolean(), delivery_count = 0 :: non_neg_integer()}). -record(cfg, {cluster_name :: cluster_name(), servers = [] :: [ra:server_id()], soft_limit = ?SOFT_LIMIT :: non_neg_integer(), block_handler = fun() -> ok end :: fun(() -> term()), unblock_handler = fun() -> ok end :: fun(() -> ok), timeout :: non_neg_integer(), version = 0 :: non_neg_integer()}). -record(state, {cfg :: #cfg{}, leader :: undefined | ra:server_id(), queue_status :: undefined | go | reject_publish, next_seq = 0 :: seq(), %% Last applied is initialise to -1 to note that no command has yet been %% applied, but allowing to resend messages if the first ones on the sequence %% are lost (messages are sent from last_applied + 1) last_applied = -1 :: maybe_seq(), next_enqueue_seq = 1 :: seq(), %% indicates that we've exceeded the soft limit slow = false :: boolean(), unsent_commands = #{} :: #{rabbit_fifo:consumer_id() => {[seq()], [seq()], [seq()]}}, pending = #{} :: #{seq() => {term(), rabbit_fifo:command()}}, consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() => #consumer{}}, timer_state :: term() }). -opaque state() :: #state{}. -export_type([ state/0, actions/0 ]). %% @doc Create the initial state for a new rabbit_fifo sessions. A state is needed %% to interact with a rabbit_fifo queue using @module. %% @param ClusterName the id of the cluster to interact with %% @param Servers The known servers of the queue. If the current leader is known %% ensure the leader node is at the head of the list. -spec init(cluster_name(), [ra:server_id()]) -> state(). init(ClusterName, Servers) -> init(ClusterName, Servers, ?SOFT_LIMIT). %% @doc Create the initial state for a new rabbit_fifo sessions. A state is needed %% to interact with a rabbit_fifo queue using @module. %% @param ClusterName the id of the cluster to interact with %% @param Servers The known servers of the queue. If the current leader is known %% ensure the leader node is at the head of the list. %% @param MaxPending size defining the max number of pending commands. -spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state(). init(ClusterName = #resource{}, Servers, SoftLimit) -> Timeout = application:get_env(kernel, net_ticktime, 60) + 5, #state{cfg = #cfg{cluster_name = ClusterName, servers = Servers, soft_limit = SoftLimit, timeout = Timeout * 1000}}. -spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok), fun(() -> ok)) -> state(). init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> %% net ticktime is in seconds Timeout = application:get_env(kernel, net_ticktime, 60) + 5, #state{cfg = #cfg{cluster_name = ClusterName, servers = Servers, block_handler = BlockFun, unblock_handler = UnblockFun, soft_limit = SoftLimit, timeout = Timeout * 1000}}. %% @doc Enqueues a message. %% @param Correlation an arbitrary erlang term used to correlate this %% command when it has been applied. %% @param Msg an arbitrary erlang term representing the message. %% @param State the current {@module} state. %% @returns %% `{ok | slow, State}' if the command was successfully sent. If the return %% tag is `slow' it means the limit is approaching and it is time to slow down %% the sending rate. %% {@module} assigns a sequence number to every raft command it issues. The %% SequenceNumber can be correlated to the applied sequence numbers returned %% by the {@link handle_ra_event/2. handle_ra_event/2} function. -spec enqueue(Correlation :: term(), Msg :: term(), State :: state()) -> {ok | slow | reject_publish, state()}. enqueue(Correlation, Msg, #state{queue_status = undefined, next_enqueue_seq = 1, cfg = #cfg{timeout = Timeout}} = State0) -> %% it is the first enqueue, check the version {_, Node} = Server = pick_server(State0), case rpc:call(Node, ra_machine, version, [{machine, rabbit_fifo, #{}}]) of 0 -> %% the leader is running the old version %% so we can't initialize the enqueuer session safely %% fall back on old behavour enqueue(Correlation, Msg, State0#state{queue_status = go}); 1 -> %% were running the new version on the leader do sync initialisation %% of enqueuer session Reg = rabbit_fifo:make_register_enqueuer(self()), case ra:process_command(Server, Reg, Timeout) of {ok, reject_publish, _} -> {reject_publish, State0#state{queue_status = reject_publish}}; {ok, ok, _} -> enqueue(Correlation, Msg, State0#state{queue_status = go}); {timeout, _} -> %% if we timeout it is probably better to reject %% the message than being uncertain {reject_publish, State0}; Err -> exit(Err) end; {badrpc, nodedown} -> {reject_publish, State0} end; enqueue(_Correlation, _Msg, #state{queue_status = reject_publish, cfg = #cfg{}} = State) -> {reject_publish, State}; enqueue(Correlation, Msg, #state{slow = Slow, queue_status = go, cfg = #cfg{block_handler = BlockFun}} = State0) -> Node = pick_server(State0), {Next, State1} = next_enqueue_seq(State0), % by default there is no correlation id Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg), case send_command(Node, Correlation, Cmd, low, State1) of {slow, State} when not Slow -> BlockFun(), {slow, set_timer(State)}; Any -> Any end. %% @doc Enqueues a message. %% @param Msg an arbitrary erlang term representing the message. %% @param State the current {@module} state. %% @returns %% `{ok | slow, State}' if the command was successfully sent. If the return %% tag is `slow' it means the limit is approaching and it is time to slow down %% the sending rate. %% {@module} assigns a sequence number to every raft command it issues. The %% SequenceNumber can be correlated to the applied sequence numbers returned %% by the {@link handle_ra_event/2. handle_ra_event/2} function. %% -spec enqueue(Msg :: term(), State :: state()) -> {ok | slow | reject_publish, state()}. enqueue(Msg, State) -> enqueue(undefined, Msg, State). %% @doc Dequeue a message from the queue. %% %% This is a synchronous call. I.e. the call will block until the command %% has been accepted by the ra process or it times out. %% %% @param ConsumerTag a unique tag to identify this particular consumer. %% @param Settlement either `settled' or `unsettled'. When `settled' no %% further settlement needs to be done. %% @param State The {@module} state. %% %% @returns `{ok, IdMsg, State}' or `{error | timeout, term()}' -spec dequeue(rabbit_fifo:consumer_tag(), Settlement :: settled | unsettled, state()) -> {ok, non_neg_integer(), term(), non_neg_integer()} | {empty, state()} | {error | timeout, term()}. dequeue(ConsumerTag, Settlement, #state{cfg = #cfg{timeout = Timeout, cluster_name = QName}} = State0) -> Node = pick_server(State0), ConsumerId = consumer_id(ConsumerTag), case ra:process_command(Node, rabbit_fifo:make_checkout(ConsumerId, {dequeue, Settlement}, #{}), Timeout) of {ok, {dequeue, empty}, Leader} -> {empty, State0#state{leader = Leader}}; {ok, {dequeue, {MsgId, {MsgHeader, Msg0}}, MsgsReady}, Leader} -> Count = case MsgHeader of #{delivery_count := C} -> C; _ -> 0 end, IsDelivered = Count > 0, Msg = add_delivery_count_header(Msg0, Count), {ok, MsgsReady, {QName, qref(Leader), MsgId, IsDelivered, Msg}, State0#state{leader = Leader}}; {ok, {error, _} = Err, _Leader} -> Err; Err -> Err end. add_delivery_count_header(#basic_message{} = Msg0, Count) when is_integer(Count) -> rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0); add_delivery_count_header(Msg, _Count) -> Msg. %% @doc Settle a message. Permanently removes message from the queue. %% @param ConsumerTag the tag uniquely identifying the consumer. %% @param MsgIds the message ids received with the {@link rabbit_fifo:delivery/0.} %% @param State the {@module} state %% @returns %% `{ok | slow, State}' if the command was successfully sent. If the return %% tag is `slow' it means the limit is approaching and it is time to slow down %% the sending rate. %% -spec settle(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> {state(), list()}. settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> Node = pick_server(State0), Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds), case send_command(Node, undefined, Cmd, normal, State0) of {_, S} -> % turn slow into ok for this function {S, []} end; settle(ConsumerTag, [_|_] = MsgIds, #state{unsent_commands = Unsent0} = State0) -> ConsumerId = consumer_id(ConsumerTag), %% we've reached the soft limit so will stash the command to be %% sent once we have seen enough notifications Unsent = maps:update_with(ConsumerId, fun ({Settles, Returns, Discards}) -> {Settles ++ MsgIds, Returns, Discards} end, {MsgIds, [], []}, Unsent0), {State0#state{unsent_commands = Unsent}, []}. %% @doc Return a message to the queue. %% @param ConsumerTag the tag uniquely identifying the consumer. %% @param MsgIds the message ids to return received %% from {@link rabbit_fifo:delivery/0.} %% @param State the {@module} state %% @returns %% `{ok | slow, State}' if the command was successfully sent. If the return %% tag is `slow' it means the limit is approaching and it is time to slow down %% the sending rate. %% -spec return(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> {state(), list()}. return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> Node = pick_server(State0), % TODO: make rabbit_fifo return support lists of message ids Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds), case send_command(Node, undefined, Cmd, normal, State0) of {_, S} -> {S, []} end; return(ConsumerTag, [_|_] = MsgIds, #state{unsent_commands = Unsent0} = State0) -> ConsumerId = consumer_id(ConsumerTag), %% we've reached the soft limit so will stash the command to be %% sent once we have seen enough notifications Unsent = maps:update_with(ConsumerId, fun ({Settles, Returns, Discards}) -> {Settles, Returns ++ MsgIds, Discards} end, {[], MsgIds, []}, Unsent0), {State0#state{unsent_commands = Unsent}, []}. %% @doc Discards a checked out message. %% If the queue has a dead_letter_handler configured this will be called. %% @param ConsumerTag the tag uniquely identifying the consumer. %% @param MsgIds the message ids to discard %% from {@link rabbit_fifo:delivery/0.} %% @param State the {@module} state %% @returns %% `{ok | slow, State}' if the command was successfully sent. If the return %% tag is `slow' it means the limit is approaching and it is time to slow down %% the sending rate. -spec discard(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> {state(), list()}. discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> Node = pick_server(State0), Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds), case send_command(Node, undefined, Cmd, normal, State0) of {_, S} -> % turn slow into ok for this function {S, []} end; discard(ConsumerTag, [_|_] = MsgIds, #state{unsent_commands = Unsent0} = State0) -> ConsumerId = consumer_id(ConsumerTag), %% we've reached the soft limit so will stash the command to be %% sent once we have seen enough notifications Unsent = maps:update_with(ConsumerId, fun ({Settles, Returns, Discards}) -> {Settles, Returns, Discards ++ MsgIds} end, {[], [], MsgIds}, Unsent0), {State0#state{unsent_commands = Unsent}, []}. %% @doc Register with the rabbit_fifo queue to "checkout" messages as they %% become available. %% %% This is a synchronous call. I.e. the call will block until the command %% has been accepted by the ra process or it times out. %% %% @param ConsumerTag a unique tag to identify this particular consumer. %% @param NumUnsettled the maximum number of in-flight messages. Once this %% number of messages has been received but not settled no further messages %% will be delivered to the consumer. %% @param State The {@module} state. %% %% @returns `{ok, State}' or `{error | timeout, term()}' -spec checkout(rabbit_fifo:consumer_tag(), NumUnsettled :: non_neg_integer(), rabbit_fifo:consumer_meta(), state()) -> {ok, state()} | {error | timeout, term()}. checkout(ConsumerTag, NumUnsettled, ConsumerInfo, State0) when is_map(ConsumerInfo) -> checkout(ConsumerTag, NumUnsettled, simple_prefetch, ConsumerInfo, State0). %% @doc Register with the rabbit_fifo queue to "checkout" messages as they %% become available. %% %% This is a synchronous call. I.e. the call will block until the command %% has been accepted by the ra process or it times out. %% %% @param ConsumerTag a unique tag to identify this particular consumer. %% @param NumUnsettled the maximum number of in-flight messages. Once this %% number of messages has been received but not settled no further messages %% will be delivered to the consumer. %% @param CreditMode The credit mode to use for the checkout. %% simple_prefetch: credit is auto topped up as deliveries are settled %% credited: credit is only increased by sending credit to the queue %% @param State The {@module} state. %% %% @returns `{ok, State}' or `{error | timeout, term()}' -spec checkout(rabbit_fifo:consumer_tag(), NumUnsettled :: non_neg_integer(), CreditMode :: rabbit_fifo:credit_mode(), Meta :: rabbit_fifo:consumer_meta(), state()) -> {ok, state()} | {error | timeout, term()}. checkout(ConsumerTag, NumUnsettled, CreditMode, Meta, #state{consumer_deliveries = CDels0} = State0) -> Servers = sorted_servers(State0), ConsumerId = {ConsumerTag, self()}, Cmd = rabbit_fifo:make_checkout(ConsumerId, {auto, NumUnsettled, CreditMode}, Meta), %% ??? Ack = maps:get(ack, Meta, true), SDels = maps:update_with(ConsumerTag, fun (V) -> V#consumer{ack = Ack} end, #consumer{last_msg_id = -1, ack = Ack}, CDels0), try_process_command(Servers, Cmd, State0#state{consumer_deliveries = SDels}). %% @doc Provide credit to the queue %% %% This only has an effect if the consumer uses credit mode: credited %% @param ConsumerTag a unique tag to identify this particular consumer. %% @param Credit the amount of credit to provide to theq queue %% @param Drain tells the queue to use up any credit that cannot be immediately %% fulfilled. (i.e. there are not enough messages on queue to use up all the %% provided credit). -spec credit(rabbit_fifo:consumer_tag(), Credit :: non_neg_integer(), Drain :: boolean(), state()) -> state(). credit(ConsumerTag, Credit, Drain, #state{consumer_deliveries = CDels} = State0) -> ConsumerId = consumer_id(ConsumerTag), %% the last received msgid provides us with the delivery count if we %% add one as it is 0 indexed C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}), Node = pick_server(State0), Cmd = rabbit_fifo:make_credit(ConsumerId, Credit, C#consumer.last_msg_id + 1, Drain), case send_command(Node, undefined, Cmd, normal, State0) of {_, S} -> % turn slow into ok for this function S end. %% @doc Cancels a checkout with the rabbit_fifo queue for the consumer tag %% %% This is a synchronous call. I.e. the call will block until the command %% has been accepted by the ra process or it times out. %% %% @param ConsumerTag a unique tag to identify this particular consumer. %% @param State The {@module} state. %% %% @returns `{ok, State}' or `{error | timeout, term()}' -spec cancel_checkout(rabbit_fifo:consumer_tag(), state()) -> {ok, state()} | {error | timeout, term()}. cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) -> Servers = sorted_servers(State0), ConsumerId = {ConsumerTag, self()}, Cmd = rabbit_fifo:make_checkout(ConsumerId, cancel, #{}), State = State0#state{consumer_deliveries = maps:remove(ConsumerTag, CDels)}, try_process_command(Servers, Cmd, State). %% @doc Purges all the messages from a rabbit_fifo queue and returns the number %% of messages purged. -spec purge(ra:server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}. purge(Node) -> case ra:process_command(Node, rabbit_fifo:make_purge()) of {ok, {purge, Reply}, _} -> {ok, Reply}; Err -> Err end. -spec pending_size(state()) -> non_neg_integer(). pending_size(#state{pending = Pend}) -> maps:size(Pend). -spec stat(ra:server_id()) -> {ok, non_neg_integer(), non_neg_integer()} | {error | timeout, term()}. stat(Leader) -> %% short timeout as we don't want to spend too long if it is going to %% fail anyway stat(Leader, 250). -spec stat(ra:server_id(), non_neg_integer()) -> {ok, non_neg_integer(), non_neg_integer()} | {error | timeout, term()}. stat(Leader, Timeout) -> %% short timeout as we don't want to spend too long if it is going to %% fail anyway case ra:local_query(Leader, fun rabbit_fifo:query_stat/1, Timeout) of {ok, {_, {R, C}}, _} -> {ok, R, C}; {error, _} = Error -> Error; {timeout, _} = Error -> Error end. %% @doc returns the cluster name -spec cluster_name(state()) -> cluster_name(). cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) -> ClusterName. update_machine_state(Server, Conf) -> case ra:process_command(Server, rabbit_fifo:make_update_config(Conf)) of {ok, ok, _} -> ok; Err -> Err end. %% @doc Handles incoming `ra_events'. Events carry both internal "bookeeping" %% events emitted by the `ra' leader as well as `rabbit_fifo' emitted events such %% as message deliveries. All ra events need to be handled by {@module} %% to ensure bookeeping, resends and flow control is correctly handled. %% %% If the `ra_event' contains a `rabbit_fifo' generated message it will be returned %% for further processing. %% %% Example: %% %% ``` %% receive %% {ra_event, From, Evt} -> %% case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of %% {internal, _Seq, State} -> State; %% {{delivery, _ConsumerTag, Msgs}, State} -> %% handle_messages(Msgs), %% ... %% end %% end %% ''' %% %% @param From the {@link ra:server_id().} of the sending process. %% @param Event the body of the `ra_event'. %% @param State the current {@module} state. %% %% @returns %% `{internal, AppliedCorrelations, State}' if the event contained an internally %% handled event such as a notification and a correlation was included with %% the command (e.g. in a call to `enqueue/3' the correlation terms are returned %% here. %% %% `{RaFifoEvent, State}' if the event contained a client message generated by %% the `rabbit_fifo' state machine such as a delivery. %% %% The type of `rabbit_fifo' client messages that can be received are: %% %% `{delivery, ConsumerTag, [{MsgId, {MsgHeader, Msg}}]}' %% %%