summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2020-09-15 13:51:32 +0100
committerGitHub <noreply@github.com>2020-09-15 13:51:32 +0100
commit48f38874d701e34727b4f482ea42bb0e3e82c460 (patch)
treea29bbd0db16ee8047118c9a160bda4b28d250d74 /src
parentc2943570b94642122145facc12e3f37d5e6bc5c1 (diff)
parentddc2eacbe2690ec61dda3a8648f7acf5cec39a03 (diff)
downloadrabbitmq-server-git-48f38874d701e34727b4f482ea42bb0e3e82c460.tar.gz
Merge pull request #2407 from rabbitmq/qq-reject-publish
reject-publish strategy support for quorum queues
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl5
-rw-r--r--src/rabbit_fifo.erl677
-rw-r--r--src/rabbit_fifo.hrl45
-rw-r--r--src/rabbit_fifo_client.erl144
-rw-r--r--src/rabbit_fifo_v0.erl1961
-rw-r--r--src/rabbit_fifo_v0.hrl195
-rw-r--r--src/rabbit_quorum_queue.erl82
7 files changed, 2762 insertions, 347 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 74d400950e..1755b4b8e2 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1871,8 +1871,9 @@ handle_publishing_queue_down(QPid, Reason,
record_rejects(RejectMXs, State1)
end
end;
-handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) ->
- error(quorum_queues_should_never_be_monitored).
+handle_publishing_queue_down(QPid, _Reason, State) when ?IS_QUORUM(QPid) ->
+ %% this should never happen after the queue type refactoring in 3.9
+ State.
handle_consuming_queue_down_or_eol(QRef,
State = #ch{queue_consumers = QCons,
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 0d0e37830a..7745789593 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -23,6 +23,9 @@
tick/2,
overview/1,
get_checked_out/4,
+ %% versioning
+ version/0,
+ which_module/1,
%% aux
init_aux/1,
handle_aux/6,
@@ -47,6 +50,7 @@
%% protocol helpers
make_enqueue/3,
+ make_register_enqueuer/1,
make_checkout/3,
make_settle/2,
make_return/2,
@@ -61,6 +65,7 @@
-record(enqueue, {pid :: option(pid()),
seq :: option(msg_seqno()),
msg :: raw_msg()}).
+-record(register_enqueuer, {pid :: pid()}).
-record(checkout, {consumer_id :: consumer_id(),
spec :: checkout_spec(),
meta :: consumer_meta()}).
@@ -80,6 +85,7 @@
-opaque protocol() ::
#enqueue{} |
+ #register_enqueuer{} |
#checkout{} |
#settle{} |
#return{} |
@@ -121,12 +127,14 @@ init(#{name := Name,
update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
- SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
+ RCI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
+ Overflow = maps:get(overflow_strategy, Conf, drop_head),
MaxLength = maps:get(max_length, Conf, undefined),
MaxBytes = maps:get(max_bytes, Conf, undefined),
MaxMemoryLength = maps:get(max_in_memory_length, Conf, undefined),
MaxMemoryBytes = maps:get(max_in_memory_bytes, Conf, undefined),
DeliveryLimit = maps:get(delivery_limit, Conf, undefined),
+ Expires = maps:get(expires, Conf, undefined),
ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
true ->
single_active;
@@ -134,24 +142,21 @@ update_config(Conf, State) ->
competing
end,
Cfg = State#?MODULE.cfg,
- SHICur = case State#?MODULE.cfg of
- #cfg{release_cursor_interval = {_, C}} ->
- C;
- #cfg{release_cursor_interval = undefined} ->
- SHI;
- #cfg{release_cursor_interval = C} ->
- C
- end,
+ RCISpec = {RCI, RCI},
- State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {SHI, SHICur},
+ LastActive = maps:get(created, Conf, undefined),
+ State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCISpec,
dead_letter_handler = DLH,
become_leader_handler = BLH,
+ overflow_strategy = Overflow,
max_length = MaxLength,
max_bytes = MaxBytes,
max_in_memory_length = MaxMemoryLength,
max_in_memory_bytes = MaxMemoryBytes,
consumer_strategy = ConsumerStrategy,
- delivery_limit = DeliveryLimit}}.
+ delivery_limit = DeliveryLimit,
+ expires = Expires},
+ last_active = LastActive}.
zero(_) ->
0.
@@ -161,9 +166,27 @@ zero(_) ->
-spec apply(ra_machine:command_meta_data(), command(), state()) ->
{state(), Reply :: term(), ra_machine:effects()} |
{state(), Reply :: term()}.
-apply(Metadata, #enqueue{pid = From, seq = Seq,
- msg = RawMsg}, State00) ->
- apply_enqueue(Metadata, From, Seq, RawMsg, State00);
+apply(Meta, #enqueue{pid = From, seq = Seq,
+ msg = RawMsg}, State00) ->
+ apply_enqueue(Meta, From, Seq, RawMsg, State00);
+apply(_Meta, #register_enqueuer{pid = Pid},
+ #?MODULE{enqueuers = Enqueuers0,
+ cfg = #cfg{overflow_strategy = Overflow}} = State0) ->
+
+ State = case maps:is_key(Pid, Enqueuers0) of
+ true ->
+ %% if the enqueuer exits just echo the overflow state
+ State0;
+ false ->
+ State0#?MODULE{enqueuers = Enqueuers0#{Pid => #enqueuer{}}}
+ end,
+ Res = case is_over_limit(State) of
+ true when Overflow == reject_publish ->
+ reject_publish;
+ _ ->
+ ok
+ end,
+ {State, Res, [{monitor, process, Pid}]};
apply(Meta,
#settle{msg_ids = MsgIds, consumer_id = ConsumerId},
#?MODULE{consumers = Cons0} = State) ->
@@ -212,8 +235,9 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
ServiceQueue0),
Cons = maps:put(ConsumerId, Con1, Cons0),
{State1, ok, Effects} =
- checkout(Meta, State0#?MODULE{service_queue = ServiceQueue,
- consumers = Cons}, []),
+ checkout(Meta, State0,
+ State0#?MODULE{service_queue = ServiceQueue,
+ consumers = Cons}, []),
Response = {send_credit_reply, messages_ready(State1)},
%% by this point all checkouts for the updated credit value
%% should be processed so we can evaluate the drain
@@ -259,10 +283,15 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
apply(_, #checkout{spec = {dequeue, _}},
#?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) ->
{State0, {error, unsupported}};
-apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement},
+apply(#{index := Index,
+ system_time := Ts,
+ from := From} = Meta, #checkout{spec = {dequeue, Settlement},
meta = ConsumerMeta,
consumer_id = ConsumerId},
- #?MODULE{consumers = Consumers} = State0) ->
+ #?MODULE{consumers = Consumers} = State00) ->
+ %% dequeue always updates last_active
+ State0 = State00#?MODULE{last_active = Ts},
+ %% all dequeue operations result in keeping the queue from expiring
Exists = maps:is_key(ConsumerId, Consumers),
case messages_ready(State0) of
0 ->
@@ -274,60 +303,69 @@ apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement},
State1 = update_consumer(ConsumerId, ConsumerMeta,
{once, 1, simple_prefetch},
State0),
- {success, _, MsgId, Msg, State2} = checkout_one(State1),
- {State, Effects} = case Settlement of
- unsettled ->
- {_, Pid} = ConsumerId,
- {State2, [{monitor, process, Pid}]};
- settled ->
- %% immediately settle the checkout
- {State3, _, Effects0} =
- apply(Meta, make_settle(ConsumerId, [MsgId]),
- State2),
- {State3, Effects0}
- end,
+ {success, _, MsgId, Msg, State2} = checkout_one(Meta, State1),
+ {State4, Effects1} = case Settlement of
+ unsettled ->
+ {_, Pid} = ConsumerId,
+ {State2, [{monitor, process, Pid}]};
+ settled ->
+ %% immediately settle the checkout
+ {State3, _, Effects0} =
+ apply(Meta, make_settle(ConsumerId, [MsgId]),
+ State2),
+ {State3, Effects0}
+ end,
+ {Reply, Effects2} =
case Msg of
{RaftIdx, {Header, 'empty'}} ->
%% TODO add here new log effect with reply
- {State, '$ra_no_reply',
- reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From)};
+ {'$ra_no_reply',
+ [reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From) |
+ Effects1]};
_ ->
- {State, {dequeue, {MsgId, Msg}, Ready-1}, Effects}
+ {{dequeue, {MsgId, Msg}, Ready-1}, Effects1}
+
+ end,
+
+ case evaluate_limit(Index, false, State0, State4, Effects2) of
+ {State, true, Effects} ->
+ update_smallest_raft_index(Index, Reply, State, Effects);
+ {State, false, Effects} ->
+ {State, Reply, Effects}
end
end;
apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
- {State, Effects} = cancel_consumer(ConsumerId, State0, [], consumer_cancel),
- checkout(Meta, State, Effects);
+ {State, Effects} = cancel_consumer(Meta, ConsumerId, State0, [],
+ consumer_cancel),
+ checkout(Meta, State0, State, Effects);
apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
consumer_id = {_, Pid} = ConsumerId},
State0) ->
State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0),
- checkout(Meta, State1, [{monitor, process, Pid}]);
-apply(#{index := RaftIdx}, #purge{},
+ checkout(Meta, State0, State1, [{monitor, process, Pid}]);
+apply(#{index := Index}, #purge{},
#?MODULE{ra_indexes = Indexes0,
returns = Returns,
messages = Messages} = State0) ->
Total = messages_ready(State0),
Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
- [I || {I, _} <- lists:sort(maps:values(Messages))]),
+ [I || {_, {I, _}} <- lqueue:to_list(Messages)]),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1,
[I || {_, {I, _}} <- lqueue:to_list(Returns)]),
- {State, _, Effects} =
- update_smallest_raft_index(RaftIdx,
- State0#?MODULE{ra_indexes = Indexes,
- messages = #{},
- returns = lqueue:new(),
- msg_bytes_enqueue = 0,
- prefix_msgs = {0, [], 0, []},
- low_msg_num = undefined,
- msg_bytes_in_memory = 0,
- msgs_ready_in_memory = 0},
- []),
- %% as we're not checking out after a purge (no point) we have to
- %% reverse the effects ourselves
- {State, {purge, Total},
- lists:reverse([garbage_collection | Effects])};
-apply(Meta, {down, Pid, noconnection},
+
+ State1 = State0#?MODULE{ra_indexes = Indexes,
+ messages = lqueue:new(),
+ returns = lqueue:new(),
+ msg_bytes_enqueue = 0,
+ prefix_msgs = {0, [], 0, []},
+ msg_bytes_in_memory = 0,
+ msgs_ready_in_memory = 0},
+ Effects0 = [garbage_collection],
+ Reply = {purge, Total},
+ {State, _, Effects} = evaluate_limit(Index, false, State0,
+ State1, Effects0),
+ update_smallest_raft_index(Index, Reply, State, Effects);
+apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = Waiting0,
@@ -344,7 +382,7 @@ apply(Meta, {down, Pid, noconnection},
S0, Cid, C0, false, suspected_down, E0),
Checked = C0#consumer.checked_out,
Credit = increase_credit(C0, maps:size(Checked)),
- {St, Effs1} = return_all(S0, Effs,
+ {St, Effs1} = return_all(Meta, S0, Effs,
Cid, C0#consumer{credit = Credit}),
%% if the consumer was cancelled there is a chance it got
%% removed when returning hence we need to be defensive here
@@ -355,7 +393,8 @@ apply(Meta, {down, Pid, noconnection},
Waiting0
end,
{St#?MODULE{consumers = maps:remove(Cid, St#?MODULE.consumers),
- waiting_consumers = Waiting},
+ waiting_consumers = Waiting,
+ last_active = Ts},
Effs1};
(_, _, S) ->
S
@@ -373,8 +412,8 @@ apply(Meta, {down, Pid, noconnection},
(_, E) -> E
end, Enqs0),
Effects = [{monitor, node, Node} | Effects1],
- checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects);
-apply(Meta, {down, Pid, noconnection},
+ checkout(Meta, State0, State#?MODULE{enqueuers = Enqs}, Effects);
+apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
%% A node has been disconnected. This doesn't necessarily mean that
@@ -395,7 +434,7 @@ apply(Meta, {down, Pid, noconnection},
Credit = increase_credit(C0, map_size(Checked0)),
C = C0#consumer{status = suspected_down,
credit = Credit},
- {St, Eff0} = return_all(St0, Eff, Cid, C),
+ {St, Eff0} = return_all(Meta, St0, Eff, Cid, C),
Eff1 = consumer_update_active_effects(St, Cid, C, false,
suspected_down, Eff0),
{St, Eff1};
@@ -416,13 +455,14 @@ apply(Meta, {down, Pid, noconnection},
_ ->
[{monitor, node, Node}]
end ++ Effects1,
- checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects);
+ checkout(Meta, State0, State#?MODULE{enqueuers = Enqs,
+ last_active = Ts}, Effects);
apply(Meta, {down, Pid, _Info}, State0) ->
- {State, Effects} = handle_down(Pid, State0),
- checkout(Meta, State, Effects);
+ {State, Effects} = handle_down(Meta, Pid, State0),
+ checkout(Meta, State0, State, Effects);
apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
enqueuers = Enqs0,
- service_queue = SQ0} = State0) ->
+ service_queue = _SQ0} = State0) ->
%% A node we are monitoring has come back.
%% If we have suspected any processes of being
%% down we should now re-issue the monitors for them to detect if they're
@@ -436,44 +476,90 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
end, Enqs0),
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
%% mark all consumers as up
- {Cons1, SQ, Effects1} =
- maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
+ {State1, Effects1} =
+ maps:fold(fun({_, P} = ConsumerId, C, {SAcc, EAcc})
when (node(P) =:= Node) and
(C#consumer.status =/= cancelled) ->
- EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId,
+ EAcc1 = ConsumerUpdateActiveFun(SAcc, ConsumerId,
C, true, up, EAcc),
- update_or_remove_sub(ConsumerId,
- C#consumer{status = up}, CAcc,
- SQAcc, EAcc1);
+ {update_or_remove_sub(Meta, ConsumerId,
+ C#consumer{status = up},
+ SAcc), EAcc1};
(_, _, Acc) ->
Acc
- end, {Cons0, SQ0, Monitors}, Cons0),
- Waiting = update_waiting_consumer_status(Node, State0, up),
- State1 = State0#?MODULE{consumers = Cons1,
+ end, {State0, Monitors}, Cons0),
+ Waiting = update_waiting_consumer_status(Node, State1, up),
+ State2 = State1#?MODULE{
enqueuers = Enqs1,
- service_queue = SQ,
waiting_consumers = Waiting},
- {State, Effects} = activate_next_consumer(State1, Effects1),
- checkout(Meta, State, Effects);
+ {State, Effects} = activate_next_consumer(State2, Effects1),
+ checkout(Meta, State0, State, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
-apply(_, #purge_nodes{nodes = Nodes}, State0) ->
+apply(Meta, #purge_nodes{nodes = Nodes}, State0) ->
{State, Effects} = lists:foldl(fun(Node, {S, E}) ->
- purge_node(Node, S, E)
+ purge_node(Meta, Node, S, E)
end, {State0, []}, Nodes),
{State, ok, Effects};
apply(Meta, #update_config{config = Conf}, State) ->
- checkout(Meta, update_config(Conf, State), []).
-
-purge_node(Node, State, Effects) ->
+ checkout(Meta, State, update_config(Conf, State), []);
+apply(_Meta, {machine_version, 0, 1}, V0State) ->
+ State = convert_v0_to_v1(V0State),
+ {State, ok, []}.
+
+convert_v0_to_v1(V0State0) ->
+ V0State = rabbit_fifo_v0:normalize_for_v1(V0State0),
+ V0Msgs = rabbit_fifo_v0:get_field(messages, V0State),
+ V1Msgs = lqueue:from_list(lists:sort(maps:to_list(V0Msgs))),
+ V0Enqs = rabbit_fifo_v0:get_field(enqueuers, V0State),
+ V1Enqs = maps:map(
+ fun (_EPid, E) ->
+ #enqueuer{next_seqno = element(2, E),
+ pending = element(3, E),
+ status = element(4, E)}
+ end, V0Enqs),
+ Cfg = #cfg{name = rabbit_fifo_v0:get_cfg_field(name, V0State),
+ resource = rabbit_fifo_v0:get_cfg_field(resource, V0State),
+ release_cursor_interval = rabbit_fifo_v0:get_cfg_field(release_cursor_interval, V0State),
+ dead_letter_handler = rabbit_fifo_v0:get_cfg_field(dead_letter_handler, V0State),
+ become_leader_handler = rabbit_fifo_v0:get_cfg_field(become_leader_handler, V0State),
+ %% TODO: what if policy enabling reject_publish was applied before conversion?
+ overflow_strategy = drop_head,
+ max_length = rabbit_fifo_v0:get_cfg_field(max_length, V0State),
+ max_bytes = rabbit_fifo_v0:get_cfg_field(max_bytes, V0State),
+ consumer_strategy = rabbit_fifo_v0:get_cfg_field(consumer_strategy, V0State),
+ delivery_limit = rabbit_fifo_v0:get_cfg_field(delivery_limit, V0State),
+ max_in_memory_length = rabbit_fifo_v0:get_cfg_field(max_in_memory_length, V0State),
+ max_in_memory_bytes = rabbit_fifo_v0:get_cfg_field(max_in_memory_bytes, V0State)
+ },
+
+ #?MODULE{cfg = Cfg,
+ messages = V1Msgs,
+ next_msg_num = rabbit_fifo_v0:get_field(next_msg_num, V0State),
+ returns = rabbit_fifo_v0:get_field(returns, V0State),
+ enqueue_count = rabbit_fifo_v0:get_field(enqueue_count, V0State),
+ enqueuers = V1Enqs,
+ ra_indexes = rabbit_fifo_v0:get_field(ra_indexes, V0State),
+ release_cursors = rabbit_fifo_v0:get_field(release_cursors, V0State),
+ consumers = rabbit_fifo_v0:get_field(consumers, V0State),
+ service_queue = rabbit_fifo_v0:get_field(service_queue, V0State),
+ prefix_msgs = rabbit_fifo_v0:get_field(prefix_msgs, V0State),
+ msg_bytes_enqueue = rabbit_fifo_v0:get_field(msg_bytes_enqueue, V0State),
+ msg_bytes_checkout = rabbit_fifo_v0:get_field(msg_bytes_checkout, V0State),
+ waiting_consumers = rabbit_fifo_v0:get_field(waiting_consumers, V0State),
+ msg_bytes_in_memory = rabbit_fifo_v0:get_field(msg_bytes_in_memory, V0State),
+ msgs_ready_in_memory = rabbit_fifo_v0:get_field(msgs_ready_in_memory, V0State)
+ }.
+
+purge_node(Meta, Node, State, Effects) ->
lists:foldl(fun(Pid, {S0, E0}) ->
- {S, E} = handle_down(Pid, S0),
+ {S, E} = handle_down(Meta, Pid, S0),
{S, E0 ++ E}
end, {State, Effects}, all_pids_for(Node, State)).
%% any downs that re not noconnection
-handle_down(Pid, #?MODULE{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+handle_down(Meta, Pid, #?MODULE{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
State1 = case maps:take(Pid, Enqs0) of
@@ -490,7 +576,7 @@ handle_down(Pid, #?MODULE{consumers = Cons0,
DownConsumers = maps:keys(
maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
lists:foldl(fun(ConsumerId, {S, E}) ->
- cancel_consumer(ConsumerId, S, E, down)
+ cancel_consumer(Meta, ConsumerId, S, E, down)
end, {State2, Effects1}, DownConsumers).
consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) ->
@@ -582,19 +668,24 @@ state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= le
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
-tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
- resource = QName},
+tick(Ts, #?MODULE{cfg = #cfg{name = Name,
+ resource = QName},
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes} = State) ->
- Metrics = {Name,
- messages_ready(State),
- num_checked_out(State), % checked out
- messages_total(State),
- query_consumer_count(State), % Consumers
- EnqueueBytes,
- CheckoutBytes},
- [{mod_call, rabbit_quorum_queue,
- handle_tick, [QName, Metrics, all_nodes(State)]}].
+ case is_expired(Ts, State) of
+ true ->
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}];
+ false ->
+ Metrics = {Name,
+ messages_ready(State),
+ num_checked_out(State), % checked out
+ messages_total(State),
+ query_consumer_count(State), % Consumers
+ EnqueueBytes,
+ CheckoutBytes},
+ [{mod_call, rabbit_quorum_queue,
+ handle_tick, [QName, Metrics, all_nodes(State)]}]
+ end.
-spec overview(state()) -> map().
overview(#?MODULE{consumers = Cons,
@@ -612,7 +703,10 @@ overview(#?MODULE{consumers = Cons,
max_bytes => Cfg#cfg.max_bytes,
consumer_strategy => Cfg#cfg.consumer_strategy,
max_in_memory_length => Cfg#cfg.max_in_memory_length,
- max_in_memory_bytes => Cfg#cfg.max_in_memory_bytes},
+ max_in_memory_bytes => Cfg#cfg.max_in_memory_bytes,
+ expires => Cfg#cfg.expires,
+ delivery_limit => Cfg#cfg.delivery_limit
+ },
#{type => ?MODULE,
config => Conf,
num_consumers => maps:size(Cons),
@@ -621,7 +715,8 @@ overview(#?MODULE{consumers = Cons,
num_ready_messages => messages_ready(State),
num_messages => messages_total(State),
num_release_cursors => lqueue:len(Cursors),
- release_crusor_enqueue_counter => EnqCount,
+ release_cursors => [I || {_, I, _} <- lqueue:to_list(Cursors)],
+ release_cursor_enqueue_counter => EnqCount,
enqueue_message_bytes => EnqueueBytes,
checkout_message_bytes => CheckoutBytes}.
@@ -637,6 +732,12 @@ get_checked_out(Cid, From, To, #?MODULE{consumers = Consumers}) ->
[]
end.
+-spec version() -> pos_integer().
+version() -> 1.
+
+which_module(0) -> rabbit_fifo_v0;
+which_module(1) -> ?MODULE.
+
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
-record(aux, {name :: atom(),
utilisation :: term(),
@@ -804,7 +905,7 @@ messages_ready(#?MODULE{messages = M,
%% prefix messages will rarely have anything in them during normal
%% operations so length/1 is fine here
- maps:size(M) + lqueue:len(R) + RCnt + PCnt.
+ lqueue:len(M) + lqueue:len(R) + RCnt + PCnt.
messages_total(#?MODULE{ra_indexes = I,
prefix_msgs = {RCnt, _R, PCnt, _P}}) ->
@@ -843,17 +944,17 @@ num_checked_out(#?MODULE{consumers = Cons}) ->
maps:size(C) + Acc
end, 0, Cons).
-cancel_consumer(ConsumerId,
+cancel_consumer(Meta, ConsumerId,
#?MODULE{cfg = #cfg{consumer_strategy = competing}} = State,
Effects, Reason) ->
- cancel_consumer0(ConsumerId, State, Effects, Reason);
-cancel_consumer(ConsumerId,
+ cancel_consumer0(Meta, ConsumerId, State, Effects, Reason);
+cancel_consumer(Meta, ConsumerId,
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = []} = State,
Effects, Reason) ->
%% single active consumer on, no consumers are waiting
- cancel_consumer0(ConsumerId, State, Effects, Reason);
-cancel_consumer(ConsumerId,
+ cancel_consumer0(Meta, ConsumerId, State, Effects, Reason);
+cancel_consumer(Meta, ConsumerId,
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = Waiting0} = State0,
@@ -862,7 +963,7 @@ cancel_consumer(ConsumerId,
case maps:is_key(ConsumerId, Cons0) of
true ->
% The active consumer is to be removed
- {State1, Effects1} = cancel_consumer0(ConsumerId, State0,
+ {State1, Effects1} = cancel_consumer0(Meta, ConsumerId, State0,
Effects0, Reason),
activate_next_consumer(State1, Effects1);
false ->
@@ -886,11 +987,12 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}},
[QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]}
| Effects].
-cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
+cancel_consumer0(Meta, ConsumerId,
+ #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
case C0 of
#{ConsumerId := Consumer} ->
- {S, Effects2} = maybe_return_all(ConsumerId, Consumer, S0,
- Effects0, Reason),
+ {S, Effects2} = maybe_return_all(Meta, ConsumerId, Consumer,
+ S0, Effects0, Reason),
%% The effects are emitted before the consumer is actually removed
%% if the consumer has unacked messages. This is a bit weird but
%% in line with what classic queues do (from an external point of
@@ -939,23 +1041,18 @@ activate_next_consumer(#?MODULE{consumers = Cons,
-maybe_return_all(ConsumerId, Consumer,
- #?MODULE{consumers = C0,
- service_queue = SQ0} = S0,
- Effects0, Reason) ->
+maybe_return_all(#{system_time := Ts} = Meta, ConsumerId, Consumer, S0, Effects0, Reason) ->
case Reason of
consumer_cancel ->
- {Cons, SQ, Effects1} =
- update_or_remove_sub(ConsumerId,
- Consumer#consumer{lifetime = once,
- credit = 0,
- status = cancelled},
- C0, SQ0, Effects0),
- {S0#?MODULE{consumers = Cons,
- service_queue = SQ}, Effects1};
+ {update_or_remove_sub(Meta, ConsumerId,
+ Consumer#consumer{lifetime = once,
+ credit = 0,
+ status = cancelled},
+ S0), Effects0};
down ->
- {S1, Effects1} = return_all(S0, Effects0, ConsumerId, Consumer),
- {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers)},
+ {S1, Effects1} = return_all(Meta, S0, Effects0, ConsumerId, Consumer),
+ {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers),
+ last_active = Ts},
Effects1}
end.
@@ -963,7 +1060,7 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
{ok, State1, Effects1} ->
State2 = append_to_master_index(RaftIdx, State1),
- {State, ok, Effects} = checkout(Meta, State2, Effects1),
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
{maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
{duplicate, State, Effects} ->
{State, ok, Effects}
@@ -993,7 +1090,6 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
end.
enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
- low_msg_num = LowMsgNum,
next_msg_num = NextMsgNum} = State0) ->
%% the initial header is an integer only - it will get expanded to a map
%% when the next required key is added
@@ -1008,10 +1104,7 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
{RaftIdx, {Header, RawMsg}}} % indexed message with header map
end,
State = add_bytes_enqueue(Header, State1),
- State#?MODULE{messages = Messages#{NextMsgNum => Msg},
- %% this is probably only done to record it when low_msg_num
- %% is undefined
- low_msg_num = min(LowMsgNum, NextMsgNum),
+ State#?MODULE{messages = lqueue:in({NextMsgNum, Msg}, Messages),
next_msg_num = NextMsgNum + 1}.
append_to_master_index(RaftIdx,
@@ -1021,21 +1114,15 @@ append_to_master_index(RaftIdx,
State#?MODULE{ra_indexes = Indexes}.
-incr_enqueue_count(#?MODULE{enqueue_count = C,
+incr_enqueue_count(#?MODULE{enqueue_count = EC,
cfg = #cfg{release_cursor_interval = {_Base, C}}
- } = State0) ->
+ } = State0) when EC >= C->
%% this will trigger a dehydrated version of the state to be stored
%% at this raft index for potential future snapshot generation
%% Q: Why don't we just stash the release cursor here?
%% A: Because it needs to be the very last thing we do and we
%% first needs to run the checkout logic.
State0#?MODULE{enqueue_count = 0};
-incr_enqueue_count(#?MODULE{cfg = #cfg{release_cursor_interval = C} = Cfg}
- = State0)
- when is_integer(C) ->
- %% conversion to new release cursor interval format
- State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
- incr_enqueue_count(State);
incr_enqueue_count(#?MODULE{enqueue_count = C} = State) ->
State#?MODULE{enqueue_count = C + 1}.
@@ -1055,25 +1142,15 @@ maybe_store_dehydrated_state(RaftIdx,
0 -> 0;
_ ->
Total = messages_total(State0),
- min(max(Total, Base),
- ?RELEASE_CURSOR_EVERY_MAX)
+ min(max(Total, Base), ?RELEASE_CURSOR_EVERY_MAX)
end,
- State = convert_prefix_msgs(
- State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval =
- {Base, Interval}}}),
+ State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval =
+ {Base, Interval}}},
Dehydrated = dehydrate_state(State),
Cursor = {release_cursor, RaftIdx, Dehydrated},
Cursors = lqueue:in(Cursor, Cursors0),
State#?MODULE{release_cursors = Cursors}
end;
-maybe_store_dehydrated_state(RaftIdx,
- #?MODULE{cfg =
- #cfg{release_cursor_interval = C} = Cfg}
- = State0)
- when is_integer(C) ->
- %% convert to new format
- State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
- maybe_store_dehydrated_state(RaftIdx, State);
maybe_store_dehydrated_state(_RaftIdx, State) ->
State.
@@ -1122,57 +1199,50 @@ snd(T) ->
element(2, T).
return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
- Effects0, #?MODULE{service_queue = SQ0} = State0) ->
+ Effects0, State0) ->
{State1, Effects1} = maps:fold(
fun(MsgId, {Tag, _} = Msg, {S0, E0})
when Tag == '$prefix_msg';
Tag == '$empty_msg'->
- return_one(MsgId, 0, Msg, S0, E0, ConsumerId);
+ return_one(Meta, MsgId, 0, Msg, S0, E0, ConsumerId);
(MsgId, {MsgNum, Msg}, {S0, E0}) ->
- return_one(MsgId, MsgNum, Msg, S0, E0,
+ return_one(Meta, MsgId, MsgNum, Msg, S0, E0,
ConsumerId)
end, {State0, Effects0}, Returned),
- {State2, Effects3} =
+ State2 =
case State1#?MODULE.consumers of
- #{ConsumerId := Con0} = Cons0 ->
+ #{ConsumerId := Con0} ->
Con = Con0#consumer{credit = increase_credit(Con0,
map_size(Returned))},
- {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con,
- Cons0, SQ0, Effects1),
- {State1#?MODULE{consumers = Cons,
- service_queue = SQ}, Effects2};
+ update_or_remove_sub(Meta, ConsumerId, Con, State1);
_ ->
- {State1, Effects1}
+ State1
end,
- {State, ok, Effects} = checkout(Meta, State2, Effects3),
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
% used to processes messages that are finished
-complete(ConsumerId, Discarded,
- #consumer{checked_out = Checked} = Con0, Effects0,
- #?MODULE{consumers = Cons0, service_queue = SQ0,
- ra_indexes = Indexes0} = State0) ->
+complete(Meta, ConsumerId, Discarded,
+ #consumer{checked_out = Checked} = Con0, Effects,
+ #?MODULE{ra_indexes = Indexes0} = State0) ->
%% TODO optimise use of Discarded map here
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
%% credit_mode = simple_prefetch should automatically top-up credit
%% as messages are simple_prefetch or otherwise returned
Con = Con0#consumer{checked_out = maps:without(maps:keys(Discarded), Checked),
credit = increase_credit(Con0, map_size(Discarded))},
- {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
- SQ0, Effects0),
+ State1 = update_or_remove_sub(Meta, ConsumerId, Con, State0),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
MsgRaftIdxs),
%% TODO: use maps:fold instead
- State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) ->
+ State2 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) ->
add_bytes_settle(Header, Acc);
({'$prefix_msg', Header}, Acc) ->
add_bytes_settle(Header, Acc);
({'$empty_msg', Header}, Acc) ->
add_bytes_settle(Header, Acc)
- end, State0, maps:values(Discarded)),
- {State1#?MODULE{consumers = Cons,
- ra_indexes = Indexes,
- service_queue = SQ}, Effects}.
+ end, State1, maps:values(Discarded)),
+ {State2#?MODULE{ra_indexes = Indexes}, Effects}.
increase_credit(#consumer{lifetime = once,
credit = Credit}, _) ->
@@ -1190,9 +1260,9 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
#consumer{checked_out = Checked0} = Con0,
Effects0, State0) ->
Discarded = maps:with(MsgIds, Checked0),
- {State2, Effects1} = complete(ConsumerId, Discarded, Con0,
+ {State2, Effects1} = complete(Meta, ConsumerId, Discarded, Con0,
Effects0, State0),
- {State, ok, Effects} = checkout(Meta, State2, Effects1),
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
dead_letter_effects(_Reason, _Discarded,
@@ -1228,8 +1298,12 @@ cancel_consumer_effects(ConsumerId,
[{mod_call, rabbit_quorum_queue,
cancel_consumer_handler, [QName, ConsumerId]} | Effects].
-update_smallest_raft_index(IncomingRaftIdx,
- #?MODULE{ra_indexes = Indexes,
+update_smallest_raft_index(Idx, State, Effects) ->
+ update_smallest_raft_index(Idx, ok, State, Effects).
+
+update_smallest_raft_index(IncomingRaftIdx, Reply,
+ #?MODULE{cfg = Cfg,
+ ra_indexes = Indexes,
release_cursors = Cursors0} = State0,
Effects) ->
case rabbit_fifo_index:size(Indexes) of
@@ -1237,18 +1311,22 @@ update_smallest_raft_index(IncomingRaftIdx,
% there are no messages on queue anymore and no pending enqueues
% we can forward release_cursor all the way until
% the last received command, hooray
- State = State0#?MODULE{release_cursors = lqueue:new()},
- {State, ok, Effects ++ [{release_cursor, IncomingRaftIdx, State}]};
+ %% reset the release cursor interval
+ #cfg{release_cursor_interval = {Base, _}} = Cfg,
+ RCI = {Base, Base},
+ State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCI},
+ release_cursors = lqueue:new(),
+ enqueue_count = 0},
+ {State, Reply, Effects ++ [{release_cursor, IncomingRaftIdx, State}]};
_ ->
Smallest = rabbit_fifo_index:smallest(Indexes),
case find_next_cursor(Smallest, Cursors0) of
{empty, Cursors} ->
- {State0#?MODULE{release_cursors = Cursors},
- ok, Effects};
+ {State0#?MODULE{release_cursors = Cursors}, Reply, Effects};
{Cursor, Cursors} ->
- %% we can emit a release cursor we've passed the smallest
+ %% we can emit a release cursor when we've passed the smallest
%% release cursor available.
- {State0#?MODULE{release_cursors = Cursors}, ok,
+ {State0#?MODULE{release_cursors = Cursors}, Reply,
Effects ++ [Cursor]}
end
end.
@@ -1272,7 +1350,7 @@ update_header(Key, UpdateFun, Default, Header) ->
maps:update_with(Key, UpdateFun, Default, Header).
-return_one(MsgId, 0, {Tag, Header0},
+return_one(Meta, MsgId, 0, {Tag, Header0},
#?MODULE{returns = Returns,
consumers = Consumers,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
@@ -1283,7 +1361,7 @@ return_one(MsgId, 0, {Tag, Header0},
Msg0 = {Tag, Header},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
- complete(ConsumerId, #{MsgId => Msg0}, Con0, Effects0, State0);
+ complete(Meta, ConsumerId, #{MsgId => Msg0}, Con0, Effects0, State0);
_ ->
%% this should not affect the release cursor in any way
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
@@ -1303,7 +1381,7 @@ return_one(MsgId, 0, {Tag, Header0},
returns = lqueue:in(Msg, Returns)}),
Effects0}
end;
-return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
+return_one(Meta, MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
#?MODULE{returns = Returns,
consumers = Consumers,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
@@ -1316,7 +1394,7 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
DlMsg = {MsgNum, Msg0},
Effects = dead_letter_effects(delivery_limit, #{none => DlMsg},
State0, Effects0),
- complete(ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0);
+ complete(Meta, ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0);
_ ->
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
%% this should not affect the release cursor in any way
@@ -1338,46 +1416,46 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
Effects0}
end.
-return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
+return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
#consumer{checked_out = Checked0} = Con) ->
%% need to sort the list so that we return messages in the order
%% they were checked out
Checked = lists:sort(maps:to_list(Checked0)),
State = State0#?MODULE{consumers = Cons#{ConsumerId => Con}},
lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) ->
- return_one(MsgId, 0, Msg, S, E, ConsumerId);
+ return_one(Meta, MsgId, 0, Msg, S, E, ConsumerId);
({MsgId, {'$empty_msg', _} = Msg}, {S, E}) ->
- return_one(MsgId, 0, Msg, S, E, ConsumerId);
+ return_one(Meta, MsgId, 0, Msg, S, E, ConsumerId);
({MsgId, {MsgNum, Msg}}, {S, E}) ->
- return_one(MsgId, MsgNum, Msg, S, E, ConsumerId)
+ return_one(Meta, MsgId, MsgNum, Msg, S, E, ConsumerId)
end, {State, Effects0}, Checked).
%% checkout new messages to consumers
-checkout(#{index := Index}, State0, Effects0) ->
- {State1, _Result, Effects1} = checkout0(checkout_one(State0),
+checkout(#{index := Index} = Meta, OldState, State0, Effects0) ->
+ {State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0),
Effects0, {#{}, #{}}),
- case evaluate_limit(false, State1, Effects1) of
+ case evaluate_limit(Index, false, OldState, State1, Effects1) of
{State, true, Effects} ->
update_smallest_raft_index(Index, State, Effects);
{State, false, Effects} ->
{State, ok, Effects}
end.
-checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State},
+checkout0(Meta, {success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State},
Effects, {SendAcc, LogAcc0}) ->
DelMsg = {RaftIdx, {MsgId, Header}},
LogAcc = maps:update_with(ConsumerId,
fun (M) -> [DelMsg | M] end,
[DelMsg], LogAcc0),
- checkout0(checkout_one(State), Effects, {SendAcc, LogAcc});
-checkout0({success, ConsumerId, MsgId, Msg, State}, Effects,
+ checkout0(Meta, checkout_one(Meta, State), Effects, {SendAcc, LogAcc});
+checkout0(Meta, {success, ConsumerId, MsgId, Msg, State}, Effects,
{SendAcc0, LogAcc}) ->
DelMsg = {MsgId, Msg},
SendAcc = maps:update_with(ConsumerId,
fun (M) -> [DelMsg | M] end,
[DelMsg], SendAcc0),
- checkout0(checkout_one(State), Effects, {SendAcc, LogAcc});
-checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
+ checkout0(Meta, checkout_one(Meta, State), Effects, {SendAcc, LogAcc});
+checkout0(_Meta, {Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
Effects1 = case Activity of
nochange ->
append_send_msg_effects(
@@ -1389,17 +1467,54 @@ checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
end,
{State0, ok, lists:reverse(Effects1)}.
-evaluate_limit(Result,
+evaluate_limit(_Index, Result, _BeforeState,
#?MODULE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}} = State,
Effects) ->
{State, Result, Effects};
-evaluate_limit(Result, State00, Effects0) ->
- State0 = convert_prefix_msgs(State00),
+evaluate_limit(Index, Result, BeforeState,
+ #?MODULE{cfg = #cfg{overflow_strategy = Strategy},
+ enqueuers = Enqs0} = State0,
+ Effects0) ->
case is_over_limit(State0) of
- true ->
+ true when Strategy == drop_head ->
{State, Effects} = drop_head(State0, Effects0),
- evaluate_limit(true, State, Effects);
+ evaluate_limit(Index, true, BeforeState, State, Effects);
+ true when Strategy == reject_publish ->
+ %% generate send_msg effect for each enqueuer to let them know
+ %% they need to block
+ {Enqs, Effects} =
+ maps:fold(
+ fun (P, #enqueuer{blocked = undefined} = E0, {Enqs, Acc}) ->
+ E = E0#enqueuer{blocked = Index},
+ {Enqs#{P => E},
+ [{send_msg, P, {queue_status, reject_publish},
+ [ra_event]} | Acc]};
+ (_P, _E, Acc) ->
+ Acc
+ end, {Enqs0, Effects0}, Enqs0),
+ {State0#?MODULE{enqueuers = Enqs}, Result, Effects};
+ false when Strategy == reject_publish ->
+ %% TODO: optimise as this case gets called for every command
+ %% pretty much
+ Before = is_below_soft_limit(BeforeState),
+ case {Before, is_below_soft_limit(State0)} of
+ {false, true} ->
+ %% we have moved below the lower limit which
+ {Enqs, Effects} =
+ maps:fold(
+ fun (P, #enqueuer{} = E0, {Enqs, Acc}) ->
+ E = E0#enqueuer{blocked = undefined},
+ {Enqs#{P => E},
+ [{send_msg, P, {queue_status, go}, [ra_event]}
+ | Acc]};
+ (_P, _E, Acc) ->
+ Acc
+ end, {Enqs0, Effects0}, Enqs0),
+ {State0#?MODULE{enqueuers = Enqs}, Result, Effects};
+ _ ->
+ {State0, Result, Effects0}
+ end;
false ->
{State0, Result, Effects0}
end.
@@ -1451,7 +1566,6 @@ take_next_msg(#?MODULE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) -
{{'$prefix_msg', Header},
State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
take_next_msg(#?MODULE{returns = Returns,
- low_msg_num = Low0,
messages = Messages0,
prefix_msgs = {NumR, R, NumP, P}} = State) ->
%% use peek rather than out there as the most likely case is an empty
@@ -1461,21 +1575,11 @@ take_next_msg(#?MODULE{returns = Returns,
{NextMsg,
State#?MODULE{returns = lqueue:drop(Returns)}};
empty when P == [] ->
- case Low0 of
- undefined ->
+ case lqueue:out(Messages0) of
+ {empty, _} ->
empty;
- _ ->
- {Msg, Messages} = maps:take(Low0, Messages0),
- case maps:size(Messages) of
- 0 ->
- {{Low0, Msg},
- State#?MODULE{messages = Messages,
- low_msg_num = undefined}};
- _ ->
- {{Low0, Msg},
- State#?MODULE{messages = Messages,
- low_msg_num = Low0 + 1}}
- end
+ {{value, {_, _} = SeqMsg}, Messages} ->
+ {SeqMsg, State#?MODULE{messages = Messages }}
end;
empty ->
[Msg | Rem] = P,
@@ -1511,7 +1615,7 @@ reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
{dequeue, {MsgId, {Header, Msg}}, Ready}}}]
end}.
-checkout_one(#?MODULE{service_queue = SQ0,
+checkout_one(Meta, #?MODULE{service_queue = SQ0,
messages = Messages0,
consumers = Cons0} = InitState) ->
case queue:peek(SQ0) of
@@ -1526,11 +1630,11 @@ checkout_one(#?MODULE{service_queue = SQ0,
%% no credit but was still on queue
%% can happen when draining
%% recurse without consumer on queue
- checkout_one(InitState#?MODULE{service_queue = SQ1});
+ checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
{ok, #consumer{status = cancelled}} ->
- checkout_one(InitState#?MODULE{service_queue = SQ1});
+ checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
{ok, #consumer{status = suspected_down}} ->
- checkout_one(InitState#?MODULE{service_queue = SQ1});
+ checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
{ok, #consumer{checked_out = Checked0,
next_msg_id = Next,
credit = Credit,
@@ -1540,11 +1644,9 @@ checkout_one(#?MODULE{service_queue = SQ0,
next_msg_id = Next + 1,
credit = Credit - 1,
delivery_count = DelCnt + 1},
- {Cons, SQ, []} = % we expect no effects
- update_or_remove_sub(ConsumerId, Con,
- Cons0, SQ1, []),
- State1 = State0#?MODULE{service_queue = SQ,
- consumers = Cons},
+ State1 = update_or_remove_sub(Meta,
+ ConsumerId, Con,
+ State0#?MODULE{service_queue = SQ1}),
{State, Msg} =
case ConsumerMsg of
{'$prefix_msg', Header} ->
@@ -1566,44 +1668,46 @@ checkout_one(#?MODULE{service_queue = SQ0,
{success, ConsumerId, Next, Msg, State};
error ->
%% consumer did not exist but was queued, recurse
- checkout_one(InitState#?MODULE{service_queue = SQ1})
+ checkout_one(Meta, InitState#?MODULE{service_queue = SQ1})
end;
empty ->
{nochange, InitState}
end;
empty ->
- case maps:size(Messages0) of
+ case lqueue:len(Messages0) of
0 -> {nochange, InitState};
_ -> {inactive, InitState}
end
end.
-update_or_remove_sub(ConsumerId, #consumer{lifetime = auto,
- credit = 0} = Con,
- Cons, ServiceQueue, Effects) ->
- {maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects};
-update_or_remove_sub(ConsumerId, #consumer{lifetime = auto} = Con,
- Cons, ServiceQueue, Effects) ->
- {maps:put(ConsumerId, Con, Cons),
- uniq_queue_in(ConsumerId, ServiceQueue), Effects};
-update_or_remove_sub(ConsumerId, #consumer{lifetime = once,
+update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto,
+ credit = 0} = Con,
+ #?MODULE{consumers = Cons} = State) ->
+ State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons)};
+update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto} = Con,
+ #?MODULE{consumers = Cons,
+ service_queue = ServiceQueue} = State) ->
+ State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons),
+ service_queue = uniq_queue_in(ConsumerId, ServiceQueue)};
+update_or_remove_sub(#{system_time := Ts},
+ ConsumerId, #consumer{lifetime = once,
checked_out = Checked,
credit = 0} = Con,
- Cons, ServiceQueue, Effects) ->
- case maps:size(Checked) of
+ #?MODULE{consumers = Cons} = State) ->
+ case maps:size(Checked) of
0 ->
% we're done with this consumer
- % TODO: demonitor consumer pid but _only_ if there are no other
- % monitors for this pid
- {maps:remove(ConsumerId, Cons), ServiceQueue, Effects};
+ State#?MODULE{consumers = maps:remove(ConsumerId, Cons),
+ last_active = Ts};
_ ->
% there are unsettled items so need to keep around
- {maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects}
+ State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons)}
end;
-update_or_remove_sub(ConsumerId, #consumer{lifetime = once} = Con,
- Cons, ServiceQueue, Effects) ->
- {maps:put(ConsumerId, Con, Cons),
- uniq_queue_in(ConsumerId, ServiceQueue), Effects}.
+update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = once} = Con,
+ #?MODULE{consumers = Cons,
+ service_queue = ServiceQueue} = State) ->
+ State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons),
+ service_queue = uniq_queue_in(ConsumerId, ServiceQueue)}.
uniq_queue_in(Key, Queue) ->
% TODO: queue:member could surely be quite expensive, however the practical
@@ -1663,18 +1767,11 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
ServiceQueue0
end.
-convert_prefix_msgs(#?MODULE{prefix_msgs = {R, P}} = State) ->
- State#?MODULE{prefix_msgs = {length(R), R, length(P), P}};
-convert_prefix_msgs(State) ->
- State.
-
%% creates a dehydrated version of the current state to be cached and
%% potentially used to for a snaphot at a later point
dehydrate_state(#?MODULE{messages = Messages,
consumers = Consumers,
returns = Returns,
- low_msg_num = Low,
- next_msg_num = Next,
prefix_msgs = {PRCnt, PrefRet0, PPCnt, PrefMsg0},
waiting_consumers = Waiting0} = State) ->
RCnt = lqueue:len(Returns),
@@ -1691,34 +1788,33 @@ dehydrate_state(#?MODULE{messages = Messages,
[],
lqueue:to_list(Returns)),
PrefRet = PrefRet0 ++ PrefRet1,
- PrefMsgsSuff = dehydrate_messages(Low, Next - 1, Messages, []),
+ PrefMsgsSuff = dehydrate_messages(Messages, []),
%% prefix messages are not populated in normal operation only after
%% recovering from a snapshot
PrefMsgs = PrefMsg0 ++ PrefMsgsSuff,
Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0],
- State#?MODULE{messages = #{},
+ State#?MODULE{messages = lqueue:new(),
ra_indexes = rabbit_fifo_index:empty(),
release_cursors = lqueue:new(),
- low_msg_num = undefined,
consumers = maps:map(fun (_, C) ->
dehydrate_consumer(C)
end, Consumers),
returns = lqueue:new(),
prefix_msgs = {PRCnt + RCnt, PrefRet,
- PPCnt + maps:size(Messages), PrefMsgs},
+ PPCnt + lqueue:len(Messages), PrefMsgs},
waiting_consumers = Waiting}.
-dehydrate_messages(Low, Next, _Msgs, Acc)
- when Next < Low ->
- Acc;
-dehydrate_messages(Low, Next, Msgs, Acc0) ->
- Acc = case maps:get(Next, Msgs) of
- {_RaftIdx, {_, 'empty'} = Msg} ->
- [Msg | Acc0];
- {_RaftIdx, {Header, _}} ->
- [Header | Acc0]
- end,
- dehydrate_messages(Low, Next - 1, Msgs, Acc).
+%% TODO make body recursive to avoid allocating lists:reverse call
+dehydrate_messages(Msgs0, Acc0) ->
+ {OutRes, Msgs} = lqueue:out(Msgs0),
+ case OutRes of
+ {value, {_MsgId, {_RaftId, {_, 'empty'} = Msg}}} ->
+ dehydrate_messages(Msgs, [Msg | Acc0]);
+ {value, {_MsgId, {_RaftId, {Header, _}}}} ->
+ dehydrate_messages(Msgs, [Header | Acc0]);
+ empty ->
+ lists:reverse(Acc0)
+ end.
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
@@ -1733,8 +1829,10 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Con#consumer{checked_out = Checked}.
%% make the state suitable for equality comparison
-normalize(#?MODULE{release_cursors = Cursors} = State) ->
- State#?MODULE{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
+normalize(#?MODULE{messages = Messages,
+ release_cursors = Cursors} = State) ->
+ State#?MODULE{messages = lqueue:from_list(lqueue:to_list(Messages)),
+ release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}}) ->
@@ -1742,12 +1840,30 @@ is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined,
is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
max_bytes = MaxBytes},
msg_bytes_enqueue = BytesEnq} = State) ->
-
messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
+is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = undefined,
+ max_bytes = undefined}}) ->
+ false;
+is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
+ max_bytes = MaxBytes},
+ msg_bytes_enqueue = BytesEnq} = State) ->
+ is_below(MaxLength, messages_ready(State)) andalso
+ is_below(MaxBytes, BytesEnq).
+
+is_below(undefined, _Num) ->
+ true;
+is_below(Val, Num) when is_integer(Val) andalso is_integer(Num) ->
+ Num =< trunc(Val * ?LOW_LIMIT).
+
-spec make_enqueue(option(pid()), option(msg_seqno()), raw_msg()) -> protocol().
make_enqueue(Pid, Seq, Msg) ->
#enqueue{pid = Pid, seq = Seq, msg = Msg}.
+
+-spec make_register_enqueuer(pid()) -> protocol().
+make_register_enqueuer(Pid) ->
+ #register_enqueuer{pid = Pid}.
+
-spec make_checkout(consumer_id(),
checkout_spec(), consumer_meta()) -> protocol().
make_checkout(ConsumerId, Spec, Meta) ->
@@ -1755,7 +1871,7 @@ make_checkout(ConsumerId, Spec, Meta) ->
spec = Spec, meta = Meta}.
-spec make_settle(consumer_id(), [msg_id()]) -> protocol().
-make_settle(ConsumerId, MsgIds) ->
+make_settle(ConsumerId, MsgIds) when is_list(MsgIds) ->
#settle{consumer_id = ConsumerId, msg_ids = MsgIds}.
-spec make_return(consumer_id(), [msg_id()]) -> protocol().
@@ -1817,7 +1933,7 @@ add_bytes_settle(#{size := Bytes}, State) ->
add_bytes_return(Bytes,
#?MODULE{msg_bytes_checkout = Checkout,
- msg_bytes_enqueue = Enqueue} = State)
+ msg_bytes_enqueue = Enqueue} = State)
when is_integer(Bytes) ->
State#?MODULE{msg_bytes_checkout = Checkout - Bytes,
msg_bytes_enqueue = Enqueue + Bytes};
@@ -1913,3 +2029,18 @@ suspected_pids_for(Node, #?MODULE{consumers = Cons0,
[P | Acc];
(_, Acc) -> Acc
end, Enqs, WaitingConsumers0).
+
+is_expired(Ts, #?MODULE{cfg = #cfg{expires = Expires},
+ last_active = LastActive,
+ consumers = Consumers})
+ when is_number(LastActive) andalso is_number(Expires) ->
+ %% TODO: should it be active consumers?
+ Active = maps:filter(fun (_, #consumer{status = suspected_down}) ->
+ false;
+ (_, _) ->
+ true
+ end, Consumers),
+
+ Ts > (LastActive + Expires) andalso maps:size(Active) == 0;
+is_expired(_Ts, _State) ->
+ false.
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index ebbaa9e1eb..4c87167ea1 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -67,7 +67,7 @@
-type applied_mfa() :: {module(), atom(), list()}.
% represents a partially applied module call
--define(RELEASE_CURSOR_EVERY, 64000).
+-define(RELEASE_CURSOR_EVERY, 2048).
-define(RELEASE_CURSOR_EVERY_MAX, 3200000).
-define(USE_AVG_HALF_LIFE, 10000.0).
%% an average QQ without any message uses about 100KB so setting this limit
@@ -75,6 +75,7 @@
-define(GC_MEM_LIMIT_B, 2000000).
-define(MB, 1048576).
+-define(LOW_LIMIT, 0.8).
-record(consumer,
{meta = #{} :: consumer_meta(),
@@ -100,21 +101,29 @@
-type consumer_strategy() :: competing | single_active.
+-type milliseconds() :: non_neg_integer().
+
-record(enqueuer,
{next_seqno = 1 :: msg_seqno(),
% out of order enqueues - sorted list
pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}],
- status = up :: up | suspected_down
+ status = up :: up |
+ suspected_down,
+ %% it is useful to have a record of when this was blocked
+ %% so that we can retry sending the block effect if
+ %% the publisher did not receive the initial one
+ blocked :: undefined | ra:index(),
+ unused_1,
+ unused_2
}).
-record(cfg,
{name :: atom(),
resource :: rabbit_types:r('queue'),
- release_cursor_interval ::
- undefined | non_neg_integer() |
- {non_neg_integer(), non_neg_integer()},
+ release_cursor_interval :: option({non_neg_integer(), non_neg_integer()}),
dead_letter_handler :: option(applied_mfa()),
become_leader_handler :: option(applied_mfa()),
+ overflow_strategy = drop_head :: drop_head | reject_publish,
max_length :: option(non_neg_integer()),
max_bytes :: option(non_neg_integer()),
%% whether single active consumer is on or not for this queue
@@ -122,7 +131,10 @@
%% the maximum number of unsuccessful delivery attempts permitted
delivery_limit :: option(non_neg_integer()),
max_in_memory_length :: option(non_neg_integer()),
- max_in_memory_bytes :: option(non_neg_integer())
+ max_in_memory_bytes :: option(non_neg_integer()),
+ expires :: undefined | milliseconds(),
+ unused_1,
+ unused_2
}).
-type prefix_msgs() :: {list(), list()} |
@@ -132,14 +144,10 @@
-record(rabbit_fifo,
{cfg :: #cfg{},
% unassigned messages
- messages = #{} :: #{msg_in_id() => indexed_msg()},
- % defines the lowest message in id available in the messages map
- % that isn't a return
- low_msg_num :: option(msg_in_id()),
- % defines the next message in id to be added to the messages map
+ messages = lqueue:new() :: lqueue:queue(),
+ % defines the next message id
next_msg_num = 1 :: msg_in_id(),
- % list of returned msg_in_ids - when checking out it picks from
- % this list first before taking low_msg_num
+ % queue of returned msg_in_ids - when checking out it picks from
returns = lqueue:new() :: lqueue:lqueue(prefix_msg() |
{msg_in_id(), indexed_msg()}),
% a counter of enqueues - used to trigger shadow copy points
@@ -178,7 +186,10 @@
%% used only when single active consumer is on
waiting_consumers = [] :: [{consumer_id(), consumer()}],
msg_bytes_in_memory = 0 :: non_neg_integer(),
- msgs_ready_in_memory = 0 :: non_neg_integer()
+ msgs_ready_in_memory = 0 :: non_neg_integer(),
+ last_active :: undefined | non_neg_integer(),
+ unused_1,
+ unused_2
}).
-type config() :: #{name := atom(),
@@ -190,5 +201,9 @@
max_bytes => non_neg_integer(),
max_in_memory_length => non_neg_integer(),
max_in_memory_bytes => non_neg_integer(),
+ overflow_strategy => drop_head | reject_publish,
single_active_consumer_on => boolean(),
- delivery_limit => non_neg_integer()}.
+ delivery_limit => non_neg_integer(),
+ expires => non_neg_integer(),
+ created => non_neg_integer()
+ }.
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 2ac9f6787a..dc98210817 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -53,9 +53,17 @@
-record(consumer, {last_msg_id :: seq() | -1,
delivery_count = 0 :: non_neg_integer()}).
--record(state, {cluster_name :: cluster_name(),
- servers = [] :: [ra:server_id()],
+-record(cfg, {cluster_name :: cluster_name(),
+ servers = [] :: [ra:server_id()],
+ soft_limit = ?SOFT_LIMIT :: non_neg_integer(),
+ block_handler = fun() -> ok end :: fun(() -> term()),
+ unblock_handler = fun() -> ok end :: fun(() -> ok),
+ timeout :: non_neg_integer(),
+ version = 0 :: non_neg_integer()}).
+
+-record(state, {cfg :: #cfg{},
leader :: undefined | ra:server_id(),
+ queue_status :: undefined | go | reject_publish,
next_seq = 0 :: seq(),
%% Last applied is initialise to -1 to note that no command has yet been
%% applied, but allowing to resend messages if the first ones on the sequence
@@ -66,15 +74,11 @@
slow = false :: boolean(),
unsent_commands = #{} :: #{rabbit_fifo:consumer_id() =>
{[seq()], [seq()], [seq()]}},
- soft_limit = ?SOFT_LIMIT :: non_neg_integer(),
pending = #{} :: #{seq() =>
{term(), rabbit_fifo:command()}},
consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() =>
#consumer{}},
- block_handler = fun() -> ok end :: fun(() -> term()),
- unblock_handler = fun() -> ok end :: fun(() -> ok),
- timer_state :: term(),
- timeout :: non_neg_integer()
+ timer_state :: term()
}).
-opaque state() :: #state{}.
@@ -102,22 +106,24 @@ init(ClusterName, Servers) ->
%% @param MaxPending size defining the max number of pending commands.
-spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit) ->
- Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
- #state{cluster_name = ClusterName,
- servers = Servers,
- soft_limit = SoftLimit,
- timeout = Timeout}.
+ Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
+ #state{cfg = #cfg{cluster_name = ClusterName,
+ servers = Servers,
+ soft_limit = SoftLimit,
+ timeout = Timeout * 1000}}.
-spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok),
fun(() -> ok)) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
- Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
- #state{cluster_name = ClusterName,
- servers = Servers,
- block_handler = BlockFun,
- unblock_handler = UnblockFun,
- soft_limit = SoftLimit,
- timeout = Timeout}.
+ %% net ticktime is in seconds
+ Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
+ #state{cfg = #cfg{cluster_name = ClusterName,
+ servers = Servers,
+ block_handler = BlockFun,
+ unblock_handler = UnblockFun,
+ soft_limit = SoftLimit,
+ timeout = Timeout * 1000}}.
+
%% @doc Enqueues a message.
%% @param Correlation an arbitrary erlang term used to correlate this
@@ -132,10 +138,47 @@ init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
%% SequenceNumber can be correlated to the applied sequence numbers returned
%% by the {@link handle_ra_event/2. handle_ra_event/2} function.
-spec enqueue(Correlation :: term(), Msg :: term(), State :: state()) ->
- {ok | slow, state()}.
-enqueue(Correlation, Msg, State0 = #state{slow = Slow,
- block_handler = BlockFun}) ->
- Node = pick_node(State0),
+ {ok | slow | reject_publish, state()}.
+enqueue(Correlation, Msg,
+ #state{queue_status = undefined,
+ next_enqueue_seq = 1,
+ cfg = #cfg{timeout = Timeout}} = State0) ->
+ %% it is the first enqueue, check the version
+ {_, Node} = Server = pick_server(State0),
+ case rpc:call(Node, ra_machine, version, [{machine, rabbit_fifo, #{}}]) of
+ 0 ->
+ %% the leader is running the old version
+ %% so we can't initialize the enqueuer session safely
+ %% fall back on old behavour
+ enqueue(Correlation, Msg, State0#state{queue_status = go});
+ 1 ->
+ %% were running the new version on the leader do sync initialisation
+ %% of enqueuer session
+ Reg = rabbit_fifo:make_register_enqueuer(self()),
+ case ra:process_command(Server, Reg, Timeout) of
+ {ok, reject_publish, _} ->
+ {reject_publish, State0#state{queue_status = reject_publish}};
+ {ok, ok, _} ->
+ enqueue(Correlation, Msg, State0#state{queue_status = go});
+ {timeout, _} ->
+ %% if we timeout it is probably better to reject
+ %% the message than being uncertain
+ {reject_publish, State0};
+ Err ->
+ exit(Err)
+ end;
+ {badrpc, nodedown} ->
+ {reject_publish, State0}
+ end;
+enqueue(_Correlation, _Msg,
+ #state{queue_status = reject_publish,
+ cfg = #cfg{}} = State) ->
+ {reject_publish, State};
+enqueue(Correlation, Msg,
+ #state{slow = Slow,
+ queue_status = go,
+ cfg = #cfg{block_handler = BlockFun}} = State0) ->
+ Node = pick_server(State0),
{Next, State1} = next_enqueue_seq(State0),
% by default there is no correlation id
Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg),
@@ -159,7 +202,7 @@ enqueue(Correlation, Msg, State0 = #state{slow = Slow,
%% by the {@link handle_ra_event/2. handle_ra_event/2} function.
%%
-spec enqueue(Msg :: term(), State :: state()) ->
- {ok | slow, state()}.
+ {ok | slow | reject_publish, state()}.
enqueue(Msg, State) ->
enqueue(undefined, Msg, State).
@@ -178,8 +221,9 @@ enqueue(Msg, State) ->
Settlement :: settled | unsettled, state()) ->
{ok, {rabbit_fifo:delivery_msg(), non_neg_integer()}
| empty, state()} | {error | timeout, term()}.
-dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
- Node = pick_node(State0),
+dequeue(ConsumerTag, Settlement,
+ #state{cfg = #cfg{timeout = Timeout}} = State0) ->
+ Node = pick_server(State0),
ConsumerId = consumer_id(ConsumerTag),
case ra:process_command(Node,
rabbit_fifo:make_checkout(ConsumerId,
@@ -189,7 +233,8 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
{ok, {dequeue, empty}, Leader} ->
{ok, empty, State0#state{leader = Leader}};
{ok, {dequeue, Msg, NumReady}, Leader} ->
- {ok, {Msg, NumReady}, State0#state{leader = Leader}};
+ {ok, {Msg, NumReady},
+ State0#state{leader = Leader}};
{ok, {error, _} = Err, _Leader} ->
Err;
Err ->
@@ -208,7 +253,7 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
-spec settle(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
{ok, state()}.
settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
- Node = pick_node(State0),
+ Node = pick_server(State0),
Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds),
case send_command(Node, undefined, Cmd, normal, State0) of
{slow, S} ->
@@ -241,7 +286,7 @@ settle(ConsumerTag, [_|_] = MsgIds,
-spec return(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
{ok, state()}.
return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
- Node = pick_node(State0),
+ Node = pick_server(State0),
% TODO: make rabbit_fifo return support lists of message ids
Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds),
case send_command(Node, undefined, Cmd, normal, State0) of
@@ -275,7 +320,7 @@ return(ConsumerTag, [_|_] = MsgIds,
-spec discard(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
{ok | slow, state()}.
discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
- Node = pick_node(State0),
+ Node = pick_server(State0),
Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds),
case send_command(Node, undefined, Cmd, normal, State0) of
{slow, S} ->
@@ -363,7 +408,7 @@ credit(ConsumerTag, Credit, Drain,
%% the last received msgid provides us with the delivery count if we
%% add one as it is 0 indexed
C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}),
- Node = pick_node(State0),
+ Node = pick_server(State0),
Cmd = rabbit_fifo:make_credit(ConsumerId, Credit,
C#consumer.last_msg_id + 1, Drain),
case send_command(Node, undefined, Cmd, normal, State0) of
@@ -429,11 +474,11 @@ stat(Leader, Timeout) ->
%% @doc returns the cluster name
-spec cluster_name(state()) -> cluster_name().
-cluster_name(#state{cluster_name = ClusterName}) ->
+cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) ->
ClusterName.
-update_machine_state(Node, Conf) ->
- case ra:process_command(Node, rabbit_fifo:make_update_config(Conf)) of
+update_machine_state(Server, Conf) ->
+ case ra:process_command(Server, rabbit_fifo:make_update_config(Conf)) of
{ok, ok, _} ->
ok;
Err ->
@@ -487,10 +532,11 @@ update_machine_state(Node, Conf) ->
{internal, Correlators :: [term()], actions(), state()} |
{rabbit_fifo:client_msg(), state()} | eol.
handle_ra_event(From, {applied, Seqs},
- #state{soft_limit = SftLmt,
- unblock_handler = UnblockFun} = State0) ->
+ #state{cfg = #cfg{soft_limit = SftLmt,
+ unblock_handler = UnblockFun}} = State00) ->
+ State0 = State00#state{leader = From},
{Corrs, Actions, State1} = lists:foldl(fun seq_applied/2,
- {[], [], State0#state{leader = From}},
+ {[], [], State0},
Seqs),
case maps:size(State1#state.pending) < SftLmt of
true when State1#state.slow == true ->
@@ -511,7 +557,7 @@ handle_ra_event(From, {applied, Seqs},
add_command(Cid, discard,
Discards, Acc)))
end, [], State1#state.unsent_commands),
- Node = pick_node(State2),
+ Node = pick_server(State2),
%% send all the settlements and returns
State = lists:foldl(fun (C, S0) ->
case send_command(Node, undefined,
@@ -527,6 +573,10 @@ handle_ra_event(From, {applied, Seqs},
end;
handle_ra_event(From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
handle_delivery(From, Del, State0);
+handle_ra_event(_, {machine, {queue_status, Status}},
+ #state{} = State) ->
+ %% just set the queue status
+ {internal, [], [], State#state{queue_status = Status}};
handle_ra_event(Leader, {machine, leader_change},
#state{leader = Leader} = State) ->
%% leader already known
@@ -543,7 +593,7 @@ handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) ->
State1 = State0#state{leader = Leader},
State = resend(Seq, State1),
{internal, [], [], State};
-handle_ra_event(_, timeout, #state{servers = Servers} = State0) ->
+handle_ra_event(_, timeout, #state{cfg = #cfg{servers = Servers}} = State0) ->
case find_leader(Servers) of
undefined ->
%% still no leader, set the timer again
@@ -645,6 +695,7 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) ->
resend_all_pending(#state{pending = Pend} = State) ->
Seqs = lists:sort(maps:keys(Pend)),
+ rabbit_log:info("rabbit_fifo_client: resend all pending ~w", [Seqs]),
lists:foldl(fun resend/2, State, Seqs).
handle_delivery(From, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,
@@ -719,16 +770,19 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) ->
[Leader])
end.
-pick_node(#state{leader = undefined, servers = [N | _]}) ->
+pick_server(#state{leader = undefined,
+ cfg = #cfg{servers = [N | _]}}) ->
%% TODO: pick random rather that first?
N;
-pick_node(#state{leader = Leader}) ->
+pick_server(#state{leader = Leader}) ->
Leader.
% servers sorted by last known leader
-sorted_servers(#state{leader = undefined, servers = Servers}) ->
+sorted_servers(#state{leader = undefined,
+ cfg = #cfg{servers = Servers}}) ->
Servers;
-sorted_servers(#state{leader = Leader, servers = Servers}) ->
+sorted_servers(#state{leader = Leader,
+ cfg = #cfg{servers = Servers}}) ->
[Leader | lists:delete(Leader, Servers)].
next_seq(#state{next_seq = Seq} = State) ->
@@ -742,7 +796,7 @@ consumer_id(ConsumerTag) ->
send_command(Server, Correlation, Command, Priority,
#state{pending = Pending,
- soft_limit = SftLmt} = State0) ->
+ cfg = #cfg{soft_limit = SftLmt}} = State0) ->
{Seq, State} = next_seq(State0),
ok = ra:pipeline_command(Server, Command, Seq, Priority),
Tag = case maps:size(Pending) >= SftLmt of
@@ -767,7 +821,7 @@ add_command(Cid, return, MsgIds, Acc) ->
add_command(Cid, discard, MsgIds, Acc) ->
[rabbit_fifo:make_discard(Cid, MsgIds) | Acc].
-set_timer(#state{servers = [Server | _]} = State) ->
+set_timer(#state{cfg = #cfg{servers = [Server | _]}} = State) ->
Ref = erlang:send_after(?TIMER_TIME, self(),
{ra_event, Server, timeout}),
State#state{timer_state = Ref}.
diff --git a/src/rabbit_fifo_v0.erl b/src/rabbit_fifo_v0.erl
new file mode 100644
index 0000000000..51f6bd133e
--- /dev/null
+++ b/src/rabbit_fifo_v0.erl
@@ -0,0 +1,1961 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_fifo_v0).
+
+-behaviour(ra_machine).
+
+-compile(inline_list_funcs).
+-compile(inline).
+-compile({no_auto_import, [apply/3]}).
+
+-include("rabbit_fifo_v0.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+-export([
+ init/1,
+ apply/3,
+ state_enter/2,
+ tick/2,
+ overview/1,
+ get_checked_out/4,
+ %% aux
+ init_aux/1,
+ handle_aux/6,
+ % queries
+ query_messages_ready/1,
+ query_messages_checked_out/1,
+ query_messages_total/1,
+ query_processes/1,
+ query_ra_indexes/1,
+ query_consumer_count/1,
+ query_consumers/1,
+ query_stat/1,
+ query_single_active_consumer/1,
+ query_in_memory_usage/1,
+ usage/1,
+
+ zero/1,
+
+ %% misc
+ dehydrate_state/1,
+ normalize/1,
+ normalize_for_v1/1,
+ %% getters for coversions
+ get_field/2,
+ get_cfg_field/2,
+
+ %% protocol helpers
+ make_enqueue/3,
+ make_checkout/3,
+ make_settle/2,
+ make_return/2,
+ make_discard/2,
+ make_credit/4,
+ make_purge/0,
+ make_purge_nodes/1,
+ make_update_config/1
+ ]).
+
+%% command records representing all the protocol actions that are supported
+-record(enqueue, {pid :: option(pid()),
+ seq :: option(msg_seqno()),
+ msg :: raw_msg()}).
+-record(checkout, {consumer_id :: consumer_id(),
+ spec :: checkout_spec(),
+ meta :: consumer_meta()}).
+-record(settle, {consumer_id :: consumer_id(),
+ msg_ids :: [msg_id()]}).
+-record(return, {consumer_id :: consumer_id(),
+ msg_ids :: [msg_id()]}).
+-record(discard, {consumer_id :: consumer_id(),
+ msg_ids :: [msg_id()]}).
+-record(credit, {consumer_id :: consumer_id(),
+ credit :: non_neg_integer(),
+ delivery_count :: non_neg_integer(),
+ drain :: boolean()}).
+-record(purge, {}).
+-record(purge_nodes, {nodes :: [node()]}).
+-record(update_config, {config :: config()}).
+
+-opaque protocol() ::
+ #enqueue{} |
+ #checkout{} |
+ #settle{} |
+ #return{} |
+ #discard{} |
+ #credit{} |
+ #purge{} |
+ #purge_nodes{} |
+ #update_config{}.
+
+-type command() :: protocol() | ra_machine:builtin_command().
+%% all the command types supported by ra fifo
+
+-type client_msg() :: delivery().
+%% the messages `rabbit_fifo' can send to consumers.
+
+-opaque state() :: #?STATE{}.
+
+-export_type([protocol/0,
+ delivery/0,
+ command/0,
+ credit_mode/0,
+ consumer_tag/0,
+ consumer_meta/0,
+ consumer_id/0,
+ client_msg/0,
+ msg/0,
+ msg_id/0,
+ msg_seqno/0,
+ delivery_msg/0,
+ state/0,
+ config/0]).
+
+-spec init(config()) -> state().
+init(#{name := Name,
+ queue_resource := Resource} = Conf) ->
+ update_config(Conf, #?STATE{cfg = #cfg{name = Name,
+ resource = Resource}}).
+
+update_config(Conf, State) ->
+ DLH = maps:get(dead_letter_handler, Conf, undefined),
+ BLH = maps:get(become_leader_handler, Conf, undefined),
+ SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
+ MaxLength = maps:get(max_length, Conf, undefined),
+ MaxBytes = maps:get(max_bytes, Conf, undefined),
+ MaxMemoryLength = maps:get(max_in_memory_length, Conf, undefined),
+ MaxMemoryBytes = maps:get(max_in_memory_bytes, Conf, undefined),
+ DeliveryLimit = maps:get(delivery_limit, Conf, undefined),
+ ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
+ true ->
+ single_active;
+ false ->
+ competing
+ end,
+ Cfg = State#?STATE.cfg,
+ SHICur = case State#?STATE.cfg of
+ #cfg{release_cursor_interval = {_, C}} ->
+ C;
+ #cfg{release_cursor_interval = undefined} ->
+ SHI;
+ #cfg{release_cursor_interval = C} ->
+ C
+ end,
+
+ State#?STATE{cfg = Cfg#cfg{release_cursor_interval = {SHI, SHICur},
+ dead_letter_handler = DLH,
+ become_leader_handler = BLH,
+ max_length = MaxLength,
+ max_bytes = MaxBytes,
+ max_in_memory_length = MaxMemoryLength,
+ max_in_memory_bytes = MaxMemoryBytes,
+ consumer_strategy = ConsumerStrategy,
+ delivery_limit = DeliveryLimit}}.
+
+zero(_) ->
+ 0.
+
+% msg_ids are scoped per consumer
+% ra_indexes holds all raft indexes for enqueues currently on queue
+-spec apply(ra_machine:command_meta_data(), command(), state()) ->
+ {state(), Reply :: term(), ra_machine:effects()} |
+ {state(), Reply :: term()}.
+apply(Metadata, #enqueue{pid = From, seq = Seq,
+ msg = RawMsg}, State00) ->
+ apply_enqueue(Metadata, From, Seq, RawMsg, State00);
+apply(Meta,
+ #settle{msg_ids = MsgIds, consumer_id = ConsumerId},
+ #?STATE{consumers = Cons0} = State) ->
+ case Cons0 of
+ #{ConsumerId := Con0} ->
+ % need to increment metrics before completing as any snapshot
+ % states taken need to include them
+ complete_and_checkout(Meta, MsgIds, ConsumerId,
+ Con0, [], State);
+ _ ->
+ {State, ok}
+
+ end;
+apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
+ #?STATE{consumers = Cons0} = State0) ->
+ case Cons0 of
+ #{ConsumerId := Con0} ->
+ Discarded = maps:with(MsgIds, Con0#consumer.checked_out),
+ Effects = dead_letter_effects(rejected, Discarded, State0, []),
+ complete_and_checkout(Meta, MsgIds, ConsumerId, Con0,
+ Effects, State0);
+ _ ->
+ {State0, ok}
+ end;
+apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
+ #?STATE{consumers = Cons0} = State) ->
+ case Cons0 of
+ #{ConsumerId := #consumer{checked_out = Checked0}} ->
+ Returned = maps:with(MsgIds, Checked0),
+ return(Meta, ConsumerId, Returned, [], State);
+ _ ->
+ {State, ok}
+ end;
+apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
+ drain = Drain, consumer_id = ConsumerId},
+ #?STATE{consumers = Cons0,
+ service_queue = ServiceQueue0,
+ waiting_consumers = Waiting0} = State0) ->
+ case Cons0 of
+ #{ConsumerId := #consumer{delivery_count = DelCnt} = Con0} ->
+ %% this can go below 0 when credit is reduced
+ C = max(0, RemoteDelCnt + NewCredit - DelCnt),
+ %% grant the credit
+ Con1 = Con0#consumer{credit = C},
+ ServiceQueue = maybe_queue_consumer(ConsumerId, Con1,
+ ServiceQueue0),
+ Cons = maps:put(ConsumerId, Con1, Cons0),
+ {State1, ok, Effects} =
+ checkout(Meta, State0#?STATE{service_queue = ServiceQueue,
+ consumers = Cons}, []),
+ Response = {send_credit_reply, messages_ready(State1)},
+ %% by this point all checkouts for the updated credit value
+ %% should be processed so we can evaluate the drain
+ case Drain of
+ false ->
+ %% just return the result of the checkout
+ {State1, Response, Effects};
+ true ->
+ Con = #consumer{credit = PostCred} =
+ maps:get(ConsumerId, State1#?STATE.consumers),
+ %% add the outstanding credit to the delivery count
+ DeliveryCount = Con#consumer.delivery_count + PostCred,
+ Consumers = maps:put(ConsumerId,
+ Con#consumer{delivery_count = DeliveryCount,
+ credit = 0},
+ State1#?STATE.consumers),
+ Drained = Con#consumer.credit,
+ {CTag, _} = ConsumerId,
+ {State1#?STATE{consumers = Consumers},
+ %% returning a multi response with two client actions
+ %% for the channel to execute
+ {multi, [Response, {send_drained, {CTag, Drained}}]},
+ Effects}
+ end;
+ _ when Waiting0 /= [] ->
+ %% there are waiting consuemrs
+ case lists:keytake(ConsumerId, 1, Waiting0) of
+ {value, {_, Con0 = #consumer{delivery_count = DelCnt}}, Waiting} ->
+ %% the consumer is a waiting one
+ %% grant the credit
+ C = max(0, RemoteDelCnt + NewCredit - DelCnt),
+ Con = Con0#consumer{credit = C},
+ State = State0#?STATE{waiting_consumers =
+ [{ConsumerId, Con} | Waiting]},
+ {State, {send_credit_reply, messages_ready(State)}};
+ false ->
+ {State0, ok}
+ end;
+ _ ->
+ %% credit for unknown consumer - just ignore
+ {State0, ok}
+ end;
+apply(_, #checkout{spec = {dequeue, _}},
+ #?STATE{cfg = #cfg{consumer_strategy = single_active}} = State0) ->
+ {State0, {error, unsupported}};
+apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement},
+ meta = ConsumerMeta,
+ consumer_id = ConsumerId},
+ #?STATE{consumers = Consumers} = State0) ->
+ Exists = maps:is_key(ConsumerId, Consumers),
+ case messages_ready(State0) of
+ 0 ->
+ {State0, {dequeue, empty}};
+ _ when Exists ->
+ %% a dequeue using the same consumer_id isn't possible at this point
+ {State0, {dequeue, empty}};
+ Ready ->
+ State1 = update_consumer(ConsumerId, ConsumerMeta,
+ {once, 1, simple_prefetch},
+ State0),
+ {success, _, MsgId, Msg, State2} = checkout_one(State1),
+ {State, Effects} = case Settlement of
+ unsettled ->
+ {_, Pid} = ConsumerId,
+ {State2, [{monitor, process, Pid}]};
+ settled ->
+ %% immediately settle the checkout
+ {State3, _, Effects0} =
+ apply(Meta, make_settle(ConsumerId, [MsgId]),
+ State2),
+ {State3, Effects0}
+ end,
+ case Msg of
+ {RaftIdx, {Header, 'empty'}} ->
+ %% TODO add here new log effect with reply
+ {State, '$ra_no_reply',
+ reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From)};
+ _ ->
+ {State, {dequeue, {MsgId, Msg}, Ready-1}, Effects}
+ end
+ end;
+apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
+ {State, Effects} = cancel_consumer(ConsumerId, State0, [], consumer_cancel),
+ checkout(Meta, State, Effects);
+apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
+ consumer_id = {_, Pid} = ConsumerId},
+ State0) ->
+ State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0),
+ checkout(Meta, State1, [{monitor, process, Pid}]);
+apply(#{index := RaftIdx}, #purge{},
+ #?STATE{ra_indexes = Indexes0,
+ returns = Returns,
+ messages = Messages} = State0) ->
+ Total = messages_ready(State0),
+ Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
+ [I || {I, _} <- lists:sort(maps:values(Messages))]),
+ Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1,
+ [I || {_, {I, _}} <- lqueue:to_list(Returns)]),
+ {State, _, Effects} =
+ update_smallest_raft_index(RaftIdx,
+ State0#?STATE{ra_indexes = Indexes,
+ messages = #{},
+ returns = lqueue:new(),
+ msg_bytes_enqueue = 0,
+ prefix_msgs = {0, [], 0, []},
+ low_msg_num = undefined,
+ msg_bytes_in_memory = 0,
+ msgs_ready_in_memory = 0},
+ []),
+ %% as we're not checking out after a purge (no point) we have to
+ %% reverse the effects ourselves
+ {State, {purge, Total},
+ lists:reverse([garbage_collection | Effects])};
+apply(Meta, {down, Pid, noconnection},
+ #?STATE{consumers = Cons0,
+ cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = Waiting0,
+ enqueuers = Enqs0} = State0) ->
+ Node = node(Pid),
+ %% if the pid refers to an active or cancelled consumer,
+ %% mark it as suspected and return it to the waiting queue
+ {State1, Effects0} =
+ maps:fold(fun({_, P} = Cid, C0, {S0, E0})
+ when node(P) =:= Node ->
+ %% the consumer should be returned to waiting
+ %% and checked out messages should be returned
+ Effs = consumer_update_active_effects(
+ S0, Cid, C0, false, suspected_down, E0),
+ Checked = C0#consumer.checked_out,
+ Credit = increase_credit(C0, maps:size(Checked)),
+ {St, Effs1} = return_all(S0, Effs,
+ Cid, C0#consumer{credit = Credit}),
+ %% if the consumer was cancelled there is a chance it got
+ %% removed when returning hence we need to be defensive here
+ Waiting = case St#?STATE.consumers of
+ #{Cid := C} ->
+ Waiting0 ++ [{Cid, C}];
+ _ ->
+ Waiting0
+ end,
+ {St#?STATE{consumers = maps:remove(Cid, St#?STATE.consumers),
+ waiting_consumers = Waiting},
+ Effs1};
+ (_, _, S) ->
+ S
+ end, {State0, []}, Cons0),
+ WaitingConsumers = update_waiting_consumer_status(Node, State1,
+ suspected_down),
+
+ %% select a new consumer from the waiting queue and run a checkout
+ State2 = State1#?STATE{waiting_consumers = WaitingConsumers},
+ {State, Effects1} = activate_next_consumer(State2, Effects0),
+
+ %% mark any enquers as suspected
+ Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
+ E#enqueuer{status = suspected_down};
+ (_, E) -> E
+ end, Enqs0),
+ Effects = [{monitor, node, Node} | Effects1],
+ checkout(Meta, State#?STATE{enqueuers = Enqs}, Effects);
+apply(Meta, {down, Pid, noconnection},
+ #?STATE{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
+ %% A node has been disconnected. This doesn't necessarily mean that
+ %% any processes on this node are down, they _may_ come back so here
+ %% we just mark them as suspected (effectively deactivated)
+ %% and return all checked out messages to the main queue for delivery to any
+ %% live consumers
+ %%
+ %% all pids for the disconnected node will be marked as suspected not just
+ %% the one we got the `down' command for
+ Node = node(Pid),
+
+ {State, Effects1} =
+ maps:fold(
+ fun({_, P} = Cid, #consumer{checked_out = Checked0,
+ status = up} = C0,
+ {St0, Eff}) when node(P) =:= Node ->
+ Credit = increase_credit(C0, map_size(Checked0)),
+ C = C0#consumer{status = suspected_down,
+ credit = Credit},
+ {St, Eff0} = return_all(St0, Eff, Cid, C),
+ Eff1 = consumer_update_active_effects(St, Cid, C, false,
+ suspected_down, Eff0),
+ {St, Eff1};
+ (_, _, {St, Eff}) ->
+ {St, Eff}
+ end, {State0, []}, Cons0),
+ Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
+ E#enqueuer{status = suspected_down};
+ (_, E) -> E
+ end, Enqs0),
+
+ % Monitor the node so that we can "unsuspect" these processes when the node
+ % comes back, then re-issue all monitors and discover the final fate of
+ % these processes
+ Effects = case maps:size(State#?STATE.consumers) of
+ 0 ->
+ [{aux, inactive}, {monitor, node, Node}];
+ _ ->
+ [{monitor, node, Node}]
+ end ++ Effects1,
+ checkout(Meta, State#?STATE{enqueuers = Enqs}, Effects);
+apply(Meta, {down, Pid, _Info}, State0) ->
+ {State, Effects} = handle_down(Pid, State0),
+ checkout(Meta, State, Effects);
+apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0,
+ enqueuers = Enqs0,
+ service_queue = SQ0} = State0) ->
+ %% A node we are monitoring has come back.
+ %% If we have suspected any processes of being
+ %% down we should now re-issue the monitors for them to detect if they're
+ %% actually down or not
+ Monitors = [{monitor, process, P}
+ || P <- suspected_pids_for(Node, State0)],
+
+ Enqs1 = maps:map(fun(P, E) when node(P) =:= Node ->
+ E#enqueuer{status = up};
+ (_, E) -> E
+ end, Enqs0),
+ ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
+ %% mark all consumers as up
+ {Cons1, SQ, Effects1} =
+ maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
+ when (node(P) =:= Node) and
+ (C#consumer.status =/= cancelled) ->
+ EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId,
+ C, true, up, EAcc),
+ update_or_remove_sub(ConsumerId,
+ C#consumer{status = up}, CAcc,
+ SQAcc, EAcc1);
+ (_, _, Acc) ->
+ Acc
+ end, {Cons0, SQ0, Monitors}, Cons0),
+ Waiting = update_waiting_consumer_status(Node, State0, up),
+ State1 = State0#?STATE{consumers = Cons1,
+ enqueuers = Enqs1,
+ service_queue = SQ,
+ waiting_consumers = Waiting},
+ {State, Effects} = activate_next_consumer(State1, Effects1),
+ checkout(Meta, State, Effects);
+apply(_, {nodedown, _Node}, State) ->
+ {State, ok};
+apply(_, #purge_nodes{nodes = Nodes}, State0) ->
+ {State, Effects} = lists:foldl(fun(Node, {S, E}) ->
+ purge_node(Node, S, E)
+ end, {State0, []}, Nodes),
+ {State, ok, Effects};
+apply(Meta, #update_config{config = Conf}, State) ->
+ checkout(Meta, update_config(Conf, State), []).
+
+purge_node(Node, State, Effects) ->
+ lists:foldl(fun(Pid, {S0, E0}) ->
+ {S, E} = handle_down(Pid, S0),
+ {S, E0 ++ E}
+ end, {State, Effects}, all_pids_for(Node, State)).
+
+%% any downs that re not noconnection
+handle_down(Pid, #?STATE{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
+ % Remove any enqueuer for the same pid and enqueue any pending messages
+ % This should be ok as we won't see any more enqueues from this pid
+ State1 = case maps:take(Pid, Enqs0) of
+ {#enqueuer{pending = Pend}, Enqs} ->
+ lists:foldl(fun ({_, RIdx, RawMsg}, S) ->
+ enqueue(RIdx, RawMsg, S)
+ end, State0#?STATE{enqueuers = Enqs}, Pend);
+ error ->
+ State0
+ end,
+ {Effects1, State2} = handle_waiting_consumer_down(Pid, State1),
+ % return checked out messages to main queue
+ % Find the consumers for the down pid
+ DownConsumers = maps:keys(
+ maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
+ lists:foldl(fun(ConsumerId, {S, E}) ->
+ cancel_consumer(ConsumerId, S, E, down)
+ end, {State2, Effects1}, DownConsumers).
+
+consumer_active_flag_update_function(#?STATE{cfg = #cfg{consumer_strategy = competing}}) ->
+ fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
+ consumer_update_active_effects(State, ConsumerId, Consumer, Active,
+ ActivityStatus, Effects)
+ end;
+consumer_active_flag_update_function(#?STATE{cfg = #cfg{consumer_strategy = single_active}}) ->
+ fun(_, _, _, _, _, Effects) ->
+ Effects
+ end.
+
+handle_waiting_consumer_down(_Pid,
+ #?STATE{cfg = #cfg{consumer_strategy = competing}} = State) ->
+ {[], State};
+handle_waiting_consumer_down(_Pid,
+ #?STATE{cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = []} = State) ->
+ {[], State};
+handle_waiting_consumer_down(Pid,
+ #?STATE{cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = WaitingConsumers0} = State0) ->
+ % get cancel effects for down waiting consumers
+ Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end,
+ WaitingConsumers0),
+ Effects = lists:foldl(fun ({ConsumerId, _}, Effects) ->
+ cancel_consumer_effects(ConsumerId, State0,
+ Effects)
+ end, [], Down),
+ % update state to have only up waiting consumers
+ StillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end,
+ WaitingConsumers0),
+ State = State0#?STATE{waiting_consumers = StillUp},
+ {Effects, State}.
+
+update_waiting_consumer_status(Node,
+ #?STATE{waiting_consumers = WaitingConsumers},
+ Status) ->
+ [begin
+ case node(Pid) of
+ Node ->
+ {ConsumerId, Consumer#consumer{status = Status}};
+ _ ->
+ {ConsumerId, Consumer}
+ end
+ end || {{_, Pid} = ConsumerId, Consumer} <- WaitingConsumers,
+ Consumer#consumer.status =/= cancelled].
+
+-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
+state_enter(leader, #?STATE{consumers = Cons,
+ enqueuers = Enqs,
+ waiting_consumers = WaitingConsumers,
+ cfg = #cfg{name = Name,
+ resource = Resource,
+ become_leader_handler = BLH},
+ prefix_msgs = {0, [], 0, []}
+ }) ->
+ % return effects to monitor all current consumers and enqueuers
+ Pids = lists:usort(maps:keys(Enqs)
+ ++ [P || {_, P} <- maps:keys(Cons)]
+ ++ [P || {{_, P}, _} <- WaitingConsumers]),
+ Mons = [{monitor, process, P} || P <- Pids],
+ Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
+ NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
+ FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
+ Effects = Mons ++ Nots ++ NodeMons ++ FHReservation,
+ case BLH of
+ undefined ->
+ Effects;
+ {Mod, Fun, Args} ->
+ [{mod_call, Mod, Fun, Args ++ [Name]} | Effects]
+ end;
+state_enter(eol, #?STATE{enqueuers = Enqs,
+ consumers = Custs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
+ WaitingConsumers1 = lists:foldl(fun({{_, P}, V}, Acc) -> Acc#{P => V} end,
+ #{}, WaitingConsumers0),
+ AllConsumers = maps:merge(Custs, WaitingConsumers1),
+ [{send_msg, P, eol, ra_event}
+ || P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
+ [{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
+state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
+ FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
+ [FHReservation];
+ state_enter(_, _) ->
+ %% catch all as not handling all states
+ [].
+
+
+-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
+tick(_Ts, #?STATE{cfg = #cfg{name = Name,
+ resource = QName},
+ msg_bytes_enqueue = EnqueueBytes,
+ msg_bytes_checkout = CheckoutBytes} = State) ->
+ Metrics = {Name,
+ messages_ready(State),
+ num_checked_out(State), % checked out
+ messages_total(State),
+ query_consumer_count(State), % Consumers
+ EnqueueBytes,
+ CheckoutBytes},
+ [{mod_call, rabbit_quorum_queue,
+ handle_tick, [QName, Metrics, all_nodes(State)]}].
+
+-spec overview(state()) -> map().
+overview(#?STATE{consumers = Cons,
+ enqueuers = Enqs,
+ release_cursors = Cursors,
+ enqueue_count = EnqCount,
+ msg_bytes_enqueue = EnqueueBytes,
+ msg_bytes_checkout = CheckoutBytes,
+ cfg = Cfg} = State) ->
+ Conf = #{name => Cfg#cfg.name,
+ resource => Cfg#cfg.resource,
+ release_cursor_interval => Cfg#cfg.release_cursor_interval,
+ dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler,
+ max_length => Cfg#cfg.max_length,
+ max_bytes => Cfg#cfg.max_bytes,
+ consumer_strategy => Cfg#cfg.consumer_strategy,
+ max_in_memory_length => Cfg#cfg.max_in_memory_length,
+ max_in_memory_bytes => Cfg#cfg.max_in_memory_bytes},
+ #{type => ?MODULE,
+ config => Conf,
+ num_consumers => maps:size(Cons),
+ num_checked_out => num_checked_out(State),
+ num_enqueuers => maps:size(Enqs),
+ num_ready_messages => messages_ready(State),
+ num_messages => messages_total(State),
+ num_release_cursors => lqueue:len(Cursors),
+ release_crusor_enqueue_counter => EnqCount,
+ enqueue_message_bytes => EnqueueBytes,
+ checkout_message_bytes => CheckoutBytes}.
+
+-spec get_checked_out(consumer_id(), msg_id(), msg_id(), state()) ->
+ [delivery_msg()].
+get_checked_out(Cid, From, To, #?STATE{consumers = Consumers}) ->
+ case Consumers of
+ #{Cid := #consumer{checked_out = Checked}} ->
+ [{K, snd(snd(maps:get(K, Checked)))}
+ || K <- lists:seq(From, To),
+ maps:is_key(K, Checked)];
+ _ ->
+ []
+ end.
+
+-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
+-record(aux, {name :: atom(),
+ utilisation :: term(),
+ gc = #aux_gc{} :: #aux_gc{}}).
+
+init_aux(Name) when is_atom(Name) ->
+ %% TODO: catch specific exception throw if table already exists
+ ok = ra_machine_ets:create_table(rabbit_fifo_usage,
+ [named_table, set, public,
+ {write_concurrency, true}]),
+ Now = erlang:monotonic_time(micro_seconds),
+ #aux{name = Name,
+ utilisation = {inactive, Now, 1, 1.0}}.
+
+handle_aux(_RaState, cast, Cmd, #aux{name = Name,
+ utilisation = Use0} = State0,
+ Log, MacState) ->
+ State = case Cmd of
+ _ when Cmd == active orelse Cmd == inactive ->
+ State0#aux{utilisation = update_use(Use0, Cmd)};
+ tick ->
+ true = ets:insert(rabbit_fifo_usage,
+ {Name, utilisation(Use0)}),
+ eval_gc(Log, MacState, State0);
+ eval ->
+ State0
+ end,
+ {no_reply, State, Log}.
+
+eval_gc(Log, #?STATE{cfg = #cfg{resource = QR}} = MacState,
+ #aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) ->
+ {Idx, _} = ra_log:last_index_term(Log),
+ {memory, Mem} = erlang:process_info(self(), memory),
+ case messages_total(MacState) of
+ 0 when Idx > LastGcIdx andalso
+ Mem > ?GC_MEM_LIMIT_B ->
+ garbage_collect(),
+ {memory, MemAfter} = erlang:process_info(self(), memory),
+ rabbit_log:debug("~s: full GC sweep complete. "
+ "Process memory reduced from ~.2fMB to ~.2fMB.",
+ [rabbit_misc:rs(QR), Mem/?MB, MemAfter/?MB]),
+ AuxState#aux{gc = Gc#aux_gc{last_raft_idx = Idx}};
+ _ ->
+ AuxState
+ end.
+
+%%% Queries
+
+query_messages_ready(State) ->
+ messages_ready(State).
+
+query_messages_checked_out(#?STATE{consumers = Consumers}) ->
+ maps:fold(fun (_, #consumer{checked_out = C}, S) ->
+ maps:size(C) + S
+ end, 0, Consumers).
+
+query_messages_total(State) ->
+ messages_total(State).
+
+query_processes(#?STATE{enqueuers = Enqs, consumers = Cons0}) ->
+ Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0),
+ maps:keys(maps:merge(Enqs, Cons)).
+
+
+query_ra_indexes(#?STATE{ra_indexes = RaIndexes}) ->
+ RaIndexes.
+
+query_consumer_count(#?STATE{consumers = Consumers,
+ waiting_consumers = WaitingConsumers}) ->
+ maps:size(Consumers) + length(WaitingConsumers).
+
+query_consumers(#?STATE{consumers = Consumers,
+ waiting_consumers = WaitingConsumers,
+ cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) ->
+ ActiveActivityStatusFun =
+ case ConsumerStrategy of
+ competing ->
+ fun(_ConsumerId,
+ #consumer{status = Status}) ->
+ case Status of
+ suspected_down ->
+ {false, Status};
+ _ ->
+ {true, Status}
+ end
+ end;
+ single_active ->
+ SingleActiveConsumer = query_single_active_consumer(State),
+ fun({Tag, Pid} = _Consumer, _) ->
+ case SingleActiveConsumer of
+ {value, {Tag, Pid}} ->
+ {true, single_active};
+ _ ->
+ {false, waiting}
+ end
+ end
+ end,
+ FromConsumers =
+ maps:fold(fun (_, #consumer{status = cancelled}, Acc) ->
+ Acc;
+ ({Tag, Pid}, #consumer{meta = Meta} = Consumer, Acc) ->
+ {Active, ActivityStatus} =
+ ActiveActivityStatusFun({Tag, Pid}, Consumer),
+ maps:put({Tag, Pid},
+ {Pid, Tag,
+ maps:get(ack, Meta, undefined),
+ maps:get(prefetch, Meta, undefined),
+ Active,
+ ActivityStatus,
+ maps:get(args, Meta, []),
+ maps:get(username, Meta, undefined)},
+ Acc)
+ end, #{}, Consumers),
+ FromWaitingConsumers =
+ lists:foldl(fun ({_, #consumer{status = cancelled}}, Acc) ->
+ Acc;
+ ({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) ->
+ {Active, ActivityStatus} =
+ ActiveActivityStatusFun({Tag, Pid}, Consumer),
+ maps:put({Tag, Pid},
+ {Pid, Tag,
+ maps:get(ack, Meta, undefined),
+ maps:get(prefetch, Meta, undefined),
+ Active,
+ ActivityStatus,
+ maps:get(args, Meta, []),
+ maps:get(username, Meta, undefined)},
+ Acc)
+ end, #{}, WaitingConsumers),
+ maps:merge(FromConsumers, FromWaitingConsumers).
+
+query_single_active_consumer(#?STATE{cfg = #cfg{consumer_strategy = single_active},
+ consumers = Consumers}) ->
+ case maps:size(Consumers) of
+ 0 ->
+ {error, no_value};
+ 1 ->
+ {value, lists:nth(1, maps:keys(Consumers))};
+ _
+ ->
+ {error, illegal_size}
+ end ;
+query_single_active_consumer(_) ->
+ disabled.
+
+query_stat(#?STATE{consumers = Consumers} = State) ->
+ {messages_ready(State), maps:size(Consumers)}.
+
+query_in_memory_usage(#?STATE{msg_bytes_in_memory = Bytes,
+ msgs_ready_in_memory = Length}) ->
+ {Length, Bytes}.
+
+-spec usage(atom()) -> float().
+usage(Name) when is_atom(Name) ->
+ case ets:lookup(rabbit_fifo_usage, Name) of
+ [] -> 0.0;
+ [{_, Use}] -> Use
+ end.
+
+%%% Internal
+
+messages_ready(#?STATE{messages = M,
+ prefix_msgs = {RCnt, _R, PCnt, _P},
+ returns = R}) ->
+
+ %% prefix messages will rarely have anything in them during normal
+ %% operations so length/1 is fine here
+ maps:size(M) + lqueue:len(R) + RCnt + PCnt.
+
+messages_total(#?STATE{ra_indexes = I,
+ prefix_msgs = {RCnt, _R, PCnt, _P}}) ->
+ rabbit_fifo_index:size(I) + RCnt + PCnt.
+
+update_use({inactive, _, _, _} = CUInfo, inactive) ->
+ CUInfo;
+update_use({active, _, _} = CUInfo, active) ->
+ CUInfo;
+update_use({active, Since, Avg}, inactive) ->
+ Now = erlang:monotonic_time(micro_seconds),
+ {inactive, Now, Now - Since, Avg};
+update_use({inactive, Since, Active, Avg}, active) ->
+ Now = erlang:monotonic_time(micro_seconds),
+ {active, Now, use_avg(Active, Now - Since, Avg)}.
+
+utilisation({active, Since, Avg}) ->
+ use_avg(erlang:monotonic_time(micro_seconds) - Since, 0, Avg);
+utilisation({inactive, Since, Active, Avg}) ->
+ use_avg(Active, erlang:monotonic_time(micro_seconds) - Since, Avg).
+
+use_avg(0, 0, Avg) ->
+ Avg;
+use_avg(Active, Inactive, Avg) ->
+ Time = Inactive + Active,
+ moving_average(Time, ?USE_AVG_HALF_LIFE, Active / Time, Avg).
+
+moving_average(_Time, _, Next, undefined) ->
+ Next;
+moving_average(Time, HalfLife, Next, Current) ->
+ Weight = math:exp(Time * math:log(0.5) / HalfLife),
+ Next * (1 - Weight) + Current * Weight.
+
+num_checked_out(#?STATE{consumers = Cons}) ->
+ maps:fold(fun (_, #consumer{checked_out = C}, Acc) ->
+ maps:size(C) + Acc
+ end, 0, Cons).
+
+cancel_consumer(ConsumerId,
+ #?STATE{cfg = #cfg{consumer_strategy = competing}} = State,
+ Effects, Reason) ->
+ cancel_consumer0(ConsumerId, State, Effects, Reason);
+cancel_consumer(ConsumerId,
+ #?STATE{cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = []} = State,
+ Effects, Reason) ->
+ %% single active consumer on, no consumers are waiting
+ cancel_consumer0(ConsumerId, State, Effects, Reason);
+cancel_consumer(ConsumerId,
+ #?STATE{consumers = Cons0,
+ cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = Waiting0} = State0,
+ Effects0, Reason) ->
+ %% single active consumer on, consumers are waiting
+ case maps:is_key(ConsumerId, Cons0) of
+ true ->
+ % The active consumer is to be removed
+ {State1, Effects1} = cancel_consumer0(ConsumerId, State0,
+ Effects0, Reason),
+ activate_next_consumer(State1, Effects1);
+ false ->
+ % The cancelled consumer is not active or cancelled
+ % Just remove it from idle_consumers
+ Waiting = lists:keydelete(ConsumerId, 1, Waiting0),
+ Effects = cancel_consumer_effects(ConsumerId, State0, Effects0),
+ % A waiting consumer isn't supposed to have any checked out messages,
+ % so nothing special to do here
+ {State0#?STATE{waiting_consumers = Waiting}, Effects}
+ end.
+
+consumer_update_active_effects(#?STATE{cfg = #cfg{resource = QName}},
+ ConsumerId, #consumer{meta = Meta},
+ Active, ActivityStatus,
+ Effects) ->
+ Ack = maps:get(ack, Meta, undefined),
+ Prefetch = maps:get(prefetch, Meta, undefined),
+ Args = maps:get(args, Meta, []),
+ [{mod_call, rabbit_quorum_queue, update_consumer_handler,
+ [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]}
+ | Effects].
+
+cancel_consumer0(ConsumerId, #?STATE{consumers = C0} = S0, Effects0, Reason) ->
+ case C0 of
+ #{ConsumerId := Consumer} ->
+ {S, Effects2} = maybe_return_all(ConsumerId, Consumer, S0,
+ Effects0, Reason),
+ %% The effects are emitted before the consumer is actually removed
+ %% if the consumer has unacked messages. This is a bit weird but
+ %% in line with what classic queues do (from an external point of
+ %% view)
+ Effects = cancel_consumer_effects(ConsumerId, S, Effects2),
+ case maps:size(S#?STATE.consumers) of
+ 0 ->
+ {S, [{aux, inactive} | Effects]};
+ _ ->
+ {S, Effects}
+ end;
+ _ ->
+ %% already removed: do nothing
+ {S0, Effects0}
+ end.
+
+activate_next_consumer(#?STATE{consumers = Cons,
+ waiting_consumers = Waiting0} = State0,
+ Effects0) ->
+ case maps:filter(fun (_, #consumer{status = S}) -> S == up end, Cons) of
+ Up when map_size(Up) == 0 ->
+ %% there are no active consumer in the consumer map
+ case lists:filter(fun ({_, #consumer{status = Status}}) ->
+ Status == up
+ end, Waiting0) of
+ [{NextConsumerId, NextConsumer} | _] ->
+ %% there is a potential next active consumer
+ Remaining = lists:keydelete(NextConsumerId, 1, Waiting0),
+ #?STATE{service_queue = ServiceQueue} = State0,
+ ServiceQueue1 = maybe_queue_consumer(NextConsumerId,
+ NextConsumer,
+ ServiceQueue),
+ State = State0#?STATE{consumers = Cons#{NextConsumerId => NextConsumer},
+ service_queue = ServiceQueue1,
+ waiting_consumers = Remaining},
+ Effects = consumer_update_active_effects(State, NextConsumerId,
+ NextConsumer, true,
+ single_active, Effects0),
+ {State, Effects};
+ [] ->
+ {State0, [{aux, inactive} | Effects0]}
+ end;
+ _ ->
+ {State0, Effects0}
+ end.
+
+
+
+maybe_return_all(ConsumerId, Consumer,
+ #?STATE{consumers = C0,
+ service_queue = SQ0} = S0,
+ Effects0, Reason) ->
+ case Reason of
+ consumer_cancel ->
+ {Cons, SQ, Effects1} =
+ update_or_remove_sub(ConsumerId,
+ Consumer#consumer{lifetime = once,
+ credit = 0,
+ status = cancelled},
+ C0, SQ0, Effects0),
+ {S0#?STATE{consumers = Cons,
+ service_queue = SQ}, Effects1};
+ down ->
+ {S1, Effects1} = return_all(S0, Effects0, ConsumerId, Consumer),
+ {S1#?STATE{consumers = maps:remove(ConsumerId, S1#?STATE.consumers)},
+ Effects1}
+ end.
+
+apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
+ case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
+ {ok, State1, Effects1} ->
+ State2 = append_to_master_index(RaftIdx, State1),
+ {State, ok, Effects} = checkout(Meta, State2, Effects1),
+ {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
+ {duplicate, State, Effects} ->
+ {State, ok, Effects}
+ end.
+
+drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects0) ->
+ case take_next_msg(State0) of
+ {FullMsg = {_MsgId, {RaftIdxToDrop, {Header, Msg}}},
+ State1} ->
+ Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0),
+ State2 = add_bytes_drop(Header, State1#?STATE{ra_indexes = Indexes}),
+ State = case Msg of
+ 'empty' -> State2;
+ _ -> subtract_in_memory_counts(Header, State2)
+ end,
+ Effects = dead_letter_effects(maxlen, #{none => FullMsg},
+ State, Effects0),
+ {State, Effects};
+ {{'$prefix_msg', Header}, State1} ->
+ State2 = subtract_in_memory_counts(Header, add_bytes_drop(Header, State1)),
+ {State2, Effects0};
+ {{'$empty_msg', Header}, State1} ->
+ State2 = add_bytes_drop(Header, State1),
+ {State2, Effects0};
+ empty ->
+ {State0, Effects0}
+ end.
+
+enqueue(RaftIdx, RawMsg, #?STATE{messages = Messages,
+ low_msg_num = LowMsgNum,
+ next_msg_num = NextMsgNum} = State0) ->
+ %% the initial header is an integer only - it will get expanded to a map
+ %% when the next required key is added
+ Header = message_size(RawMsg),
+ {State1, Msg} =
+ case evaluate_memory_limit(Header, State0) of
+ true ->
+ % indexed message with header map
+ {State0, {RaftIdx, {Header, 'empty'}}};
+ false ->
+ {add_in_memory_counts(Header, State0),
+ {RaftIdx, {Header, RawMsg}}} % indexed message with header map
+ end,
+ State = add_bytes_enqueue(Header, State1),
+ State#?STATE{messages = Messages#{NextMsgNum => Msg},
+ %% this is probably only done to record it when low_msg_num
+ %% is undefined
+ low_msg_num = min(LowMsgNum, NextMsgNum),
+ next_msg_num = NextMsgNum + 1}.
+
+append_to_master_index(RaftIdx,
+ #?STATE{ra_indexes = Indexes0} = State0) ->
+ State = incr_enqueue_count(State0),
+ Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0),
+ State#?STATE{ra_indexes = Indexes}.
+
+
+incr_enqueue_count(#?STATE{enqueue_count = C,
+ cfg = #cfg{release_cursor_interval = {_Base, C}}
+ } = State0) ->
+ %% this will trigger a dehydrated version of the state to be stored
+ %% at this raft index for potential future snapshot generation
+ %% Q: Why don't we just stash the release cursor here?
+ %% A: Because it needs to be the very last thing we do and we
+ %% first needs to run the checkout logic.
+ State0#?STATE{enqueue_count = 0};
+incr_enqueue_count(#?STATE{cfg = #cfg{release_cursor_interval = C} = Cfg}
+ = State0)
+ when is_integer(C) ->
+ %% conversion to new release cursor interval format
+ State = State0#?STATE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
+ incr_enqueue_count(State);
+incr_enqueue_count(#?STATE{enqueue_count = C} = State) ->
+ State#?STATE{enqueue_count = C + 1}.
+
+maybe_store_dehydrated_state(RaftIdx,
+ #?STATE{cfg =
+ #cfg{release_cursor_interval = {Base, _}}
+ = Cfg,
+ ra_indexes = Indexes,
+ enqueue_count = 0,
+ release_cursors = Cursors0} = State0) ->
+ case rabbit_fifo_index:exists(RaftIdx, Indexes) of
+ false ->
+ %% the incoming enqueue must already have been dropped
+ State0;
+ true ->
+ Interval = case Base of
+ 0 -> 0;
+ _ ->
+ Total = messages_total(State0),
+ min(max(Total, Base),
+ ?RELEASE_CURSOR_EVERY_MAX)
+ end,
+ State = convert_prefix_msgs(
+ State0#?STATE{cfg = Cfg#cfg{release_cursor_interval =
+ {Base, Interval}}}),
+ Dehydrated = dehydrate_state(State),
+ Cursor = {release_cursor, RaftIdx, Dehydrated},
+ Cursors = lqueue:in(Cursor, Cursors0),
+ State#?STATE{release_cursors = Cursors}
+ end;
+maybe_store_dehydrated_state(RaftIdx,
+ #?STATE{cfg =
+ #cfg{release_cursor_interval = C} = Cfg}
+ = State0)
+ when is_integer(C) ->
+ %% convert to new format
+ State = State0#?STATE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
+ maybe_store_dehydrated_state(RaftIdx, State);
+maybe_store_dehydrated_state(_RaftIdx, State) ->
+ State.
+
+enqueue_pending(From,
+ #enqueuer{next_seqno = Next,
+ pending = [{Next, RaftIdx, RawMsg} | Pending]} = Enq0,
+ State0) ->
+ State = enqueue(RaftIdx, RawMsg, State0),
+ Enq = Enq0#enqueuer{next_seqno = Next + 1, pending = Pending},
+ enqueue_pending(From, Enq, State);
+enqueue_pending(From, Enq, #?STATE{enqueuers = Enqueuers0} = State) ->
+ State#?STATE{enqueuers = Enqueuers0#{From => Enq}}.
+
+maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) ->
+ % direct enqueue without tracking
+ State = enqueue(RaftIdx, RawMsg, State0),
+ {ok, State, Effects};
+maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
+ #?STATE{enqueuers = Enqueuers0} = State0) ->
+ case maps:get(From, Enqueuers0, undefined) of
+ undefined ->
+ State1 = State0#?STATE{enqueuers = Enqueuers0#{From => #enqueuer{}}},
+ {ok, State, Effects} = maybe_enqueue(RaftIdx, From, MsgSeqNo,
+ RawMsg, Effects0, State1),
+ {ok, State, [{monitor, process, From} | Effects]};
+ #enqueuer{next_seqno = MsgSeqNo} = Enq0 ->
+ % it is the next expected seqno
+ State1 = enqueue(RaftIdx, RawMsg, State0),
+ Enq = Enq0#enqueuer{next_seqno = MsgSeqNo + 1},
+ State = enqueue_pending(From, Enq, State1),
+ {ok, State, Effects0};
+ #enqueuer{next_seqno = Next,
+ pending = Pending0} = Enq0
+ when MsgSeqNo > Next ->
+ % out of order delivery
+ Pending = [{MsgSeqNo, RaftIdx, RawMsg} | Pending0],
+ Enq = Enq0#enqueuer{pending = lists:sort(Pending)},
+ {ok, State0#?STATE{enqueuers = Enqueuers0#{From => Enq}}, Effects0};
+ #enqueuer{next_seqno = Next} when MsgSeqNo =< Next ->
+ % duplicate delivery - remove the raft index from the ra_indexes
+ % map as it was added earlier
+ {duplicate, State0, Effects0}
+ end.
+
+snd(T) ->
+ element(2, T).
+
+return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
+ Effects0, #?STATE{service_queue = SQ0} = State0) ->
+ {State1, Effects1} = maps:fold(
+ fun(MsgId, {Tag, _} = Msg, {S0, E0})
+ when Tag == '$prefix_msg';
+ Tag == '$empty_msg'->
+ return_one(MsgId, 0, Msg, S0, E0, ConsumerId);
+ (MsgId, {MsgNum, Msg}, {S0, E0}) ->
+ return_one(MsgId, MsgNum, Msg, S0, E0,
+ ConsumerId)
+ end, {State0, Effects0}, Returned),
+ {State2, Effects3} =
+ case State1#?STATE.consumers of
+ #{ConsumerId := Con0} = Cons0 ->
+ Con = Con0#consumer{credit = increase_credit(Con0,
+ map_size(Returned))},
+ {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con,
+ Cons0, SQ0, Effects1),
+ {State1#?STATE{consumers = Cons,
+ service_queue = SQ}, Effects2};
+ _ ->
+ {State1, Effects1}
+ end,
+ {State, ok, Effects} = checkout(Meta, State2, Effects3),
+ update_smallest_raft_index(IncomingRaftIdx, State, Effects).
+
+% used to processes messages that are finished
+complete(ConsumerId, Discarded,
+ #consumer{checked_out = Checked} = Con0, Effects0,
+ #?STATE{consumers = Cons0, service_queue = SQ0,
+ ra_indexes = Indexes0} = State0) ->
+ %% TODO optimise use of Discarded map here
+ MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
+ %% credit_mode = simple_prefetch should automatically top-up credit
+ %% as messages are simple_prefetch or otherwise returned
+ Con = Con0#consumer{checked_out = maps:without(maps:keys(Discarded), Checked),
+ credit = increase_credit(Con0, map_size(Discarded))},
+ {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
+ SQ0, Effects0),
+ Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
+ MsgRaftIdxs),
+ %% TODO: use maps:fold instead
+ State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) ->
+ add_bytes_settle(Header, Acc);
+ ({'$prefix_msg', Header}, Acc) ->
+ add_bytes_settle(Header, Acc);
+ ({'$empty_msg', Header}, Acc) ->
+ add_bytes_settle(Header, Acc)
+ end, State0, maps:values(Discarded)),
+ {State1#?STATE{consumers = Cons,
+ ra_indexes = Indexes,
+ service_queue = SQ}, Effects}.
+
+increase_credit(#consumer{lifetime = once,
+ credit = Credit}, _) ->
+ %% once consumers cannot increment credit
+ Credit;
+increase_credit(#consumer{lifetime = auto,
+ credit_mode = credited,
+ credit = Credit}, _) ->
+ %% credit_mode: credit also doesn't automatically increment credit
+ Credit;
+increase_credit(#consumer{credit = Current}, Credit) ->
+ Current + Credit.
+
+complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
+ #consumer{checked_out = Checked0} = Con0,
+ Effects0, State0) ->
+ Discarded = maps:with(MsgIds, Checked0),
+ {State2, Effects1} = complete(ConsumerId, Discarded, Con0,
+ Effects0, State0),
+ {State, ok, Effects} = checkout(Meta, State2, Effects1),
+ update_smallest_raft_index(IncomingRaftIdx, State, Effects).
+
+dead_letter_effects(_Reason, _Discarded,
+ #?STATE{cfg = #cfg{dead_letter_handler = undefined}},
+ Effects) ->
+ Effects;
+dead_letter_effects(Reason, Discarded,
+ #?STATE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}},
+ Effects) ->
+ RaftIdxs = maps:fold(
+ fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) ->
+ [RaftIdx | Acc];
+ (_, _, Acc) ->
+ Acc
+ end, [], Discarded),
+ [{log, RaftIdxs,
+ fun (Log) ->
+ Lookup = maps:from_list(lists:zip(RaftIdxs, Log)),
+ DeadLetters = maps:fold(
+ fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) ->
+ {enqueue, _, _, Msg} = maps:get(RaftIdx, Lookup),
+ [{Reason, Msg} | Acc];
+ (_, {_, {_, {_Header, Msg}}}, Acc) ->
+ [{Reason, Msg} | Acc];
+ (_, _, Acc) ->
+ Acc
+ end, [], Discarded),
+ [{mod_call, Mod, Fun, Args ++ [DeadLetters]}]
+ end} | Effects].
+
+cancel_consumer_effects(ConsumerId,
+ #?STATE{cfg = #cfg{resource = QName}}, Effects) ->
+ [{mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [QName, ConsumerId]} | Effects].
+
+update_smallest_raft_index(IncomingRaftIdx,
+ #?STATE{ra_indexes = Indexes,
+ release_cursors = Cursors0} = State0,
+ Effects) ->
+ case rabbit_fifo_index:size(Indexes) of
+ 0 ->
+ % there are no messages on queue anymore and no pending enqueues
+ % we can forward release_cursor all the way until
+ % the last received command, hooray
+ State = State0#?STATE{release_cursors = lqueue:new()},
+ {State, ok, Effects ++ [{release_cursor, IncomingRaftIdx, State}]};
+ _ ->
+ Smallest = rabbit_fifo_index:smallest(Indexes),
+ case find_next_cursor(Smallest, Cursors0) of
+ {empty, Cursors} ->
+ {State0#?STATE{release_cursors = Cursors},
+ ok, Effects};
+ {Cursor, Cursors} ->
+ %% we can emit a release cursor we've passed the smallest
+ %% release cursor available.
+ {State0#?STATE{release_cursors = Cursors}, ok,
+ Effects ++ [Cursor]}
+ end
+ end.
+
+find_next_cursor(Idx, Cursors) ->
+ find_next_cursor(Idx, Cursors, empty).
+
+find_next_cursor(Smallest, Cursors0, Potential) ->
+ case lqueue:out(Cursors0) of
+ {{value, {_, Idx, _} = Cursor}, Cursors} when Idx < Smallest ->
+ %% we found one but it may not be the largest one
+ find_next_cursor(Smallest, Cursors, Cursor);
+ _ ->
+ {Potential, Cursors0}
+ end.
+
+update_header(Key, UpdateFun, Default, Header)
+ when is_integer(Header) ->
+ update_header(Key, UpdateFun, Default, #{size => Header});
+update_header(Key, UpdateFun, Default, Header) ->
+ maps:update_with(Key, UpdateFun, Default, Header).
+
+
+return_one(MsgId, 0, {Tag, Header0},
+ #?STATE{returns = Returns,
+ consumers = Consumers,
+ cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
+ Effects0, ConsumerId)
+ when Tag == '$prefix_msg'; Tag == '$empty_msg' ->
+ #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
+ Header = update_header(delivery_count, fun (C) -> C+1 end, 1, Header0),
+ Msg0 = {Tag, Header},
+ case maps:get(delivery_count, Header) of
+ DeliveryCount when DeliveryCount > DeliveryLimit ->
+ complete(ConsumerId, #{MsgId => Msg0}, Con0, Effects0, State0);
+ _ ->
+ %% this should not affect the release cursor in any way
+ Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
+ {Msg, State1} = case Tag of
+ '$empty_msg' ->
+ {Msg0, State0};
+ _ -> case evaluate_memory_limit(Header, State0) of
+ true ->
+ {{'$empty_msg', Header}, State0};
+ false ->
+ {Msg0, add_in_memory_counts(Header, State0)}
+ end
+ end,
+ {add_bytes_return(
+ Header,
+ State1#?STATE{consumers = Consumers#{ConsumerId => Con},
+ returns = lqueue:in(Msg, Returns)}),
+ Effects0}
+ end;
+return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
+ #?STATE{returns = Returns,
+ consumers = Consumers,
+ cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
+ Effects0, ConsumerId) ->
+ #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
+ Header = update_header(delivery_count, fun (C) -> C+1 end, 1, Header0),
+ Msg0 = {RaftId, {Header, RawMsg}},
+ case maps:get(delivery_count, Header) of
+ DeliveryCount when DeliveryCount > DeliveryLimit ->
+ DlMsg = {MsgNum, Msg0},
+ Effects = dead_letter_effects(delivery_limit, #{none => DlMsg},
+ State0, Effects0),
+ complete(ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0);
+ _ ->
+ Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
+ %% this should not affect the release cursor in any way
+ {Msg, State1} = case RawMsg of
+ 'empty' ->
+ {Msg0, State0};
+ _ ->
+ case evaluate_memory_limit(Header, State0) of
+ true ->
+ {{RaftId, {Header, 'empty'}}, State0};
+ false ->
+ {Msg0, add_in_memory_counts(Header, State0)}
+ end
+ end,
+ {add_bytes_return(
+ Header,
+ State1#?STATE{consumers = Consumers#{ConsumerId => Con},
+ returns = lqueue:in({MsgNum, Msg}, Returns)}),
+ Effects0}
+ end.
+
+return_all(#?STATE{consumers = Cons} = State0, Effects0, ConsumerId,
+ #consumer{checked_out = Checked0} = Con) ->
+ %% need to sort the list so that we return messages in the order
+ %% they were checked out
+ Checked = lists:sort(maps:to_list(Checked0)),
+ State = State0#?STATE{consumers = Cons#{ConsumerId => Con}},
+ lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) ->
+ return_one(MsgId, 0, Msg, S, E, ConsumerId);
+ ({MsgId, {'$empty_msg', _} = Msg}, {S, E}) ->
+ return_one(MsgId, 0, Msg, S, E, ConsumerId);
+ ({MsgId, {MsgNum, Msg}}, {S, E}) ->
+ return_one(MsgId, MsgNum, Msg, S, E, ConsumerId)
+ end, {State, Effects0}, Checked).
+
+%% checkout new messages to consumers
+checkout(#{index := Index}, State0, Effects0) ->
+ {State1, _Result, Effects1} = checkout0(checkout_one(State0),
+ Effects0, {#{}, #{}}),
+ case evaluate_limit(false, State1, Effects1) of
+ {State, true, Effects} ->
+ update_smallest_raft_index(Index, State, Effects);
+ {State, false, Effects} ->
+ {State, ok, Effects}
+ end.
+
+checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State},
+ Effects, {SendAcc, LogAcc0}) ->
+ DelMsg = {RaftIdx, {MsgId, Header}},
+ LogAcc = maps:update_with(ConsumerId,
+ fun (M) -> [DelMsg | M] end,
+ [DelMsg], LogAcc0),
+ checkout0(checkout_one(State), Effects, {SendAcc, LogAcc});
+checkout0({success, ConsumerId, MsgId, Msg, State}, Effects,
+ {SendAcc0, LogAcc}) ->
+ DelMsg = {MsgId, Msg},
+ SendAcc = maps:update_with(ConsumerId,
+ fun (M) -> [DelMsg | M] end,
+ [DelMsg], SendAcc0),
+ checkout0(checkout_one(State), Effects, {SendAcc, LogAcc});
+checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
+ Effects1 = case Activity of
+ nochange ->
+ append_send_msg_effects(
+ append_log_effects(Effects0, LogAcc), SendAcc);
+ inactive ->
+ [{aux, inactive}
+ | append_send_msg_effects(
+ append_log_effects(Effects0, LogAcc), SendAcc)]
+ end,
+ {State0, ok, lists:reverse(Effects1)}.
+
+evaluate_limit(Result,
+ #?STATE{cfg = #cfg{max_length = undefined,
+ max_bytes = undefined}} = State,
+ Effects) ->
+ {State, Result, Effects};
+evaluate_limit(Result, State00, Effects0) ->
+ State0 = convert_prefix_msgs(State00),
+ case is_over_limit(State0) of
+ true ->
+ {State, Effects} = drop_head(State0, Effects0),
+ evaluate_limit(true, State, Effects);
+ false ->
+ {State0, Result, Effects0}
+ end.
+
+evaluate_memory_limit(_Header,
+ #?STATE{cfg = #cfg{max_in_memory_length = undefined,
+ max_in_memory_bytes = undefined}}) ->
+ false;
+evaluate_memory_limit(#{size := Size}, State) ->
+ evaluate_memory_limit(Size, State);
+evaluate_memory_limit(Size,
+ #?STATE{cfg = #cfg{max_in_memory_length = MaxLength,
+ max_in_memory_bytes = MaxBytes},
+ msg_bytes_in_memory = Bytes,
+ msgs_ready_in_memory = Length})
+ when is_integer(Size) ->
+ (Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes).
+
+append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 ->
+ Effects;
+append_send_msg_effects(Effects0, AccMap) ->
+ Effects = maps:fold(fun (C, Msgs, Ef) ->
+ [send_msg_effect(C, lists:reverse(Msgs)) | Ef]
+ end, Effects0, AccMap),
+ [{aux, active} | Effects].
+
+append_log_effects(Effects0, AccMap) ->
+ maps:fold(fun (C, Msgs, Ef) ->
+ [send_log_effect(C, lists:reverse(Msgs)) | Ef]
+ end, Effects0, AccMap).
+
+%% next message is determined as follows:
+%% First we check if there are are prefex returns
+%% Then we check if there are current returns
+%% then we check prefix msgs
+%% then we check current messages
+%%
+%% When we return it is always done to the current return queue
+%% for both prefix messages and current messages
+take_next_msg(#?STATE{prefix_msgs = {R, P}} = State) ->
+ %% conversion
+ take_next_msg(State#?STATE{prefix_msgs = {length(R), R, length(P), P}});
+take_next_msg(#?STATE{prefix_msgs = {NumR, [{'$empty_msg', _} = Msg | Rem],
+ NumP, P}} = State) ->
+ %% there are prefix returns, these should be served first
+ {Msg, State#?STATE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
+take_next_msg(#?STATE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) ->
+ %% there are prefix returns, these should be served first
+ {{'$prefix_msg', Header},
+ State#?STATE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
+take_next_msg(#?STATE{returns = Returns,
+ low_msg_num = Low0,
+ messages = Messages0,
+ prefix_msgs = {NumR, R, NumP, P}} = State) ->
+ %% use peek rather than out there as the most likely case is an empty
+ %% queue
+ case lqueue:peek(Returns) of
+ {value, NextMsg} ->
+ {NextMsg,
+ State#?STATE{returns = lqueue:drop(Returns)}};
+ empty when P == [] ->
+ case Low0 of
+ undefined ->
+ empty;
+ _ ->
+ {Msg, Messages} = maps:take(Low0, Messages0),
+ case maps:size(Messages) of
+ 0 ->
+ {{Low0, Msg},
+ State#?STATE{messages = Messages,
+ low_msg_num = undefined}};
+ _ ->
+ {{Low0, Msg},
+ State#?STATE{messages = Messages,
+ low_msg_num = Low0 + 1}}
+ end
+ end;
+ empty ->
+ [Msg | Rem] = P,
+ case Msg of
+ {Header, 'empty'} ->
+ %% There are prefix msgs
+ {{'$empty_msg', Header},
+ State#?STATE{prefix_msgs = {NumR, R, NumP-1, Rem}}};
+ Header ->
+ {{'$prefix_msg', Header},
+ State#?STATE{prefix_msgs = {NumR, R, NumP-1, Rem}}}
+ end
+ end.
+
+send_msg_effect({CTag, CPid}, Msgs) ->
+ {send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}.
+
+send_log_effect({CTag, CPid}, IdxMsgs) ->
+ {RaftIdxs, Data} = lists:unzip(IdxMsgs),
+ {log, RaftIdxs,
+ fun(Log) ->
+ Msgs = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) ->
+ {MsgId, {Header, Msg}}
+ end, Log, Data),
+ [{send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}]
+ end,
+ {local, node(CPid)}}.
+
+reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
+ {log, [RaftIdx],
+ fun([{enqueue, _, _, Msg}]) ->
+ [{reply, From, {wrap_reply,
+ {dequeue, {MsgId, {Header, Msg}}, Ready}}}]
+ end}.
+
+checkout_one(#?STATE{service_queue = SQ0,
+ messages = Messages0,
+ consumers = Cons0} = InitState) ->
+ case queue:peek(SQ0) of
+ {value, ConsumerId} ->
+ case take_next_msg(InitState) of
+ {ConsumerMsg, State0} ->
+ SQ1 = queue:drop(SQ0),
+ %% there are consumers waiting to be serviced
+ %% process consumer checkout
+ case maps:find(ConsumerId, Cons0) of
+ {ok, #consumer{credit = 0}} ->
+ %% no credit but was still on queue
+ %% can happen when draining
+ %% recurse without consumer on queue
+ checkout_one(InitState#?STATE{service_queue = SQ1});
+ {ok, #consumer{status = cancelled}} ->
+ checkout_one(InitState#?STATE{service_queue = SQ1});
+ {ok, #consumer{status = suspected_down}} ->
+ checkout_one(InitState#?STATE{service_queue = SQ1});
+ {ok, #consumer{checked_out = Checked0,
+ next_msg_id = Next,
+ credit = Credit,
+ delivery_count = DelCnt} = Con0} ->
+ Checked = maps:put(Next, ConsumerMsg, Checked0),
+ Con = Con0#consumer{checked_out = Checked,
+ next_msg_id = Next + 1,
+ credit = Credit - 1,
+ delivery_count = DelCnt + 1},
+ {Cons, SQ, []} = % we expect no effects
+ update_or_remove_sub(ConsumerId, Con,
+ Cons0, SQ1, []),
+ State1 = State0#?STATE{service_queue = SQ,
+ consumers = Cons},
+ {State, Msg} =
+ case ConsumerMsg of
+ {'$prefix_msg', Header} ->
+ {subtract_in_memory_counts(
+ Header, add_bytes_checkout(Header, State1)),
+ ConsumerMsg};
+ {'$empty_msg', Header} ->
+ {add_bytes_checkout(Header, State1),
+ ConsumerMsg};
+ {_, {_, {Header, 'empty'}} = M} ->
+ {add_bytes_checkout(Header, State1),
+ M};
+ {_, {_, {Header, _} = M}} ->
+ {subtract_in_memory_counts(
+ Header,
+ add_bytes_checkout(Header, State1)),
+ M}
+ end,
+ {success, ConsumerId, Next, Msg, State};
+ error ->
+ %% consumer did not exist but was queued, recurse
+ checkout_one(InitState#?STATE{service_queue = SQ1})
+ end;
+ empty ->
+ {nochange, InitState}
+ end;
+ empty ->
+ case maps:size(Messages0) of
+ 0 -> {nochange, InitState};
+ _ -> {inactive, InitState}
+ end
+ end.
+
+update_or_remove_sub(ConsumerId, #consumer{lifetime = auto,
+ credit = 0} = Con,
+ Cons, ServiceQueue, Effects) ->
+ {maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects};
+update_or_remove_sub(ConsumerId, #consumer{lifetime = auto} = Con,
+ Cons, ServiceQueue, Effects) ->
+ {maps:put(ConsumerId, Con, Cons),
+ uniq_queue_in(ConsumerId, ServiceQueue), Effects};
+update_or_remove_sub(ConsumerId, #consumer{lifetime = once,
+ checked_out = Checked,
+ credit = 0} = Con,
+ Cons, ServiceQueue, Effects) ->
+ case maps:size(Checked) of
+ 0 ->
+ % we're done with this consumer
+ % TODO: demonitor consumer pid but _only_ if there are no other
+ % monitors for this pid
+ {maps:remove(ConsumerId, Cons), ServiceQueue, Effects};
+ _ ->
+ % there are unsettled items so need to keep around
+ {maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects}
+ end;
+update_or_remove_sub(ConsumerId, #consumer{lifetime = once} = Con,
+ Cons, ServiceQueue, Effects) ->
+ {maps:put(ConsumerId, Con, Cons),
+ uniq_queue_in(ConsumerId, ServiceQueue), Effects}.
+
+uniq_queue_in(Key, Queue) ->
+ % TODO: queue:member could surely be quite expensive, however the practical
+ % number of unique consumers may not be large enough for it to matter
+ case queue:member(Key, Queue) of
+ true ->
+ Queue;
+ false ->
+ queue:in(Key, Queue)
+ end.
+
+update_consumer(ConsumerId, Meta, Spec,
+ #?STATE{cfg = #cfg{consumer_strategy = competing}} = State0) ->
+ %% general case, single active consumer off
+ update_consumer0(ConsumerId, Meta, Spec, State0);
+update_consumer(ConsumerId, Meta, Spec,
+ #?STATE{consumers = Cons0,
+ cfg = #cfg{consumer_strategy = single_active}} = State0)
+ when map_size(Cons0) == 0 ->
+ %% single active consumer on, no one is consuming yet
+ update_consumer0(ConsumerId, Meta, Spec, State0);
+update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
+ #?STATE{cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = WaitingConsumers0} = State0) ->
+ %% single active consumer on and one active consumer already
+ %% adding the new consumer to the waiting list
+ Consumer = #consumer{lifetime = Life, meta = Meta,
+ credit = Credit, credit_mode = Mode},
+ WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}],
+ State0#?STATE{waiting_consumers = WaitingConsumers1}.
+
+update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
+ #?STATE{consumers = Cons0,
+ service_queue = ServiceQueue0} = State0) ->
+ %% TODO: this logic may not be correct for updating a pre-existing consumer
+ Init = #consumer{lifetime = Life, meta = Meta,
+ credit = Credit, credit_mode = Mode},
+ Cons = maps:update_with(ConsumerId,
+ fun(S) ->
+ %% remove any in-flight messages from
+ %% the credit update
+ N = maps:size(S#consumer.checked_out),
+ C = max(0, Credit - N),
+ S#consumer{lifetime = Life, credit = C}
+ end, Init, Cons0),
+ ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons),
+ ServiceQueue0),
+ State0#?STATE{consumers = Cons, service_queue = ServiceQueue}.
+
+maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
+ ServiceQueue0) ->
+ case Credit > 0 of
+ true ->
+ % consumerect needs service - check if already on service queue
+ uniq_queue_in(ConsumerId, ServiceQueue0);
+ false ->
+ ServiceQueue0
+ end.
+
+convert_prefix_msgs(#?STATE{prefix_msgs = {R, P}} = State) ->
+ State#?STATE{prefix_msgs = {length(R), R, length(P), P}};
+convert_prefix_msgs(State) ->
+ State.
+
+%% creates a dehydrated version of the current state to be cached and
+%% potentially used to for a snaphot at a later point
+dehydrate_state(#?STATE{messages = Messages,
+ consumers = Consumers,
+ returns = Returns,
+ low_msg_num = Low,
+ next_msg_num = Next,
+ prefix_msgs = {PRCnt, PrefRet0, PPCnt, PrefMsg0},
+ waiting_consumers = Waiting0} = State) ->
+ RCnt = lqueue:len(Returns),
+ %% TODO: optimise this function as far as possible
+ PrefRet1 = lists:foldr(fun ({'$prefix_msg', Header}, Acc) ->
+ [Header | Acc];
+ ({'$empty_msg', _} = Msg, Acc) ->
+ [Msg | Acc];
+ ({_, {_, {Header, 'empty'}}}, Acc) ->
+ [{'$empty_msg', Header} | Acc];
+ ({_, {_, {Header, _}}}, Acc) ->
+ [Header | Acc]
+ end,
+ [],
+ lqueue:to_list(Returns)),
+ PrefRet = PrefRet0 ++ PrefRet1,
+ PrefMsgsSuff = dehydrate_messages(Low, Next - 1, Messages, []),
+ %% prefix messages are not populated in normal operation only after
+ %% recovering from a snapshot
+ PrefMsgs = PrefMsg0 ++ PrefMsgsSuff,
+ Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0],
+ State#?STATE{messages = #{},
+ ra_indexes = rabbit_fifo_index:empty(),
+ release_cursors = lqueue:new(),
+ low_msg_num = undefined,
+ consumers = maps:map(fun (_, C) ->
+ dehydrate_consumer(C)
+ end, Consumers),
+ returns = lqueue:new(),
+ prefix_msgs = {PRCnt + RCnt, PrefRet,
+ PPCnt + maps:size(Messages), PrefMsgs},
+ waiting_consumers = Waiting}.
+
+dehydrate_messages(Low, Next, _Msgs, Acc)
+ when Next < Low ->
+ Acc;
+dehydrate_messages(Low, Next, Msgs, Acc0) ->
+ Acc = case maps:get(Next, Msgs) of
+ {_RaftIdx, {_, 'empty'} = Msg} ->
+ [Msg | Acc0];
+ {_RaftIdx, {Header, _}} ->
+ [Header | Acc0]
+ end,
+ dehydrate_messages(Low, Next - 1, Msgs, Acc).
+
+dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
+ Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
+ M;
+ (_, {'$empty_msg', _} = M) ->
+ M;
+ (_, {_, {_, {Header, 'empty'}}}) ->
+ {'$empty_msg', Header};
+ (_, {_, {_, {Header, _}}}) ->
+ {'$prefix_msg', Header}
+ end, Checked0),
+ Con#consumer{checked_out = Checked}.
+
+%% make the state suitable for equality comparison
+normalize(#?STATE{release_cursors = Cursors} = State) ->
+ State#?STATE{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
+
+is_over_limit(#?STATE{cfg = #cfg{max_length = undefined,
+ max_bytes = undefined}}) ->
+ false;
+is_over_limit(#?STATE{cfg = #cfg{max_length = MaxLength,
+ max_bytes = MaxBytes},
+ msg_bytes_enqueue = BytesEnq} = State) ->
+
+ messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
+
+normalize_for_v1(#?STATE{cfg = Cfg} = State) ->
+ %% run all v0 conversions so that v1 does not have to have this code
+ RCI = case Cfg of
+ #cfg{release_cursor_interval = {_, _} = R} ->
+ R;
+ #cfg{release_cursor_interval = undefined} ->
+ {?RELEASE_CURSOR_EVERY, ?RELEASE_CURSOR_EVERY};
+ #cfg{release_cursor_interval = C} ->
+ {?RELEASE_CURSOR_EVERY, C}
+ end,
+ convert_prefix_msgs(
+ State#?STATE{cfg = Cfg#cfg{release_cursor_interval = RCI}}).
+
+get_field(Field, State) ->
+ Fields = record_info(fields, ?STATE),
+ Index = record_index_of(Field, Fields),
+ element(Index, State).
+
+get_cfg_field(Field, #?STATE{cfg = Cfg} ) ->
+ Fields = record_info(fields, cfg),
+ Index = record_index_of(Field, Fields),
+ element(Index, Cfg).
+
+record_index_of(F, Fields) ->
+ index_of(2, F, Fields).
+
+index_of(_, F, []) ->
+ exit({field_not_found, F});
+index_of(N, F, [F | _]) ->
+ N;
+index_of(N, F, [_ | T]) ->
+ index_of(N+1, F, T).
+
+-spec make_enqueue(option(pid()), option(msg_seqno()), raw_msg()) -> protocol().
+make_enqueue(Pid, Seq, Msg) ->
+ #enqueue{pid = Pid, seq = Seq, msg = Msg}.
+-spec make_checkout(consumer_id(),
+ checkout_spec(), consumer_meta()) -> protocol().
+make_checkout(ConsumerId, Spec, Meta) ->
+ #checkout{consumer_id = ConsumerId,
+ spec = Spec, meta = Meta}.
+
+-spec make_settle(consumer_id(), [msg_id()]) -> protocol().
+make_settle(ConsumerId, MsgIds) ->
+ #settle{consumer_id = ConsumerId, msg_ids = MsgIds}.
+
+-spec make_return(consumer_id(), [msg_id()]) -> protocol().
+make_return(ConsumerId, MsgIds) ->
+ #return{consumer_id = ConsumerId, msg_ids = MsgIds}.
+
+-spec make_discard(consumer_id(), [msg_id()]) -> protocol().
+make_discard(ConsumerId, MsgIds) ->
+ #discard{consumer_id = ConsumerId, msg_ids = MsgIds}.
+
+-spec make_credit(consumer_id(), non_neg_integer(), non_neg_integer(),
+ boolean()) -> protocol().
+make_credit(ConsumerId, Credit, DeliveryCount, Drain) ->
+ #credit{consumer_id = ConsumerId,
+ credit = Credit,
+ delivery_count = DeliveryCount,
+ drain = Drain}.
+
+-spec make_purge() -> protocol().
+make_purge() -> #purge{}.
+
+-spec make_purge_nodes([node()]) -> protocol().
+make_purge_nodes(Nodes) ->
+ #purge_nodes{nodes = Nodes}.
+
+-spec make_update_config(config()) -> protocol().
+make_update_config(Config) ->
+ #update_config{config = Config}.
+
+add_bytes_enqueue(Bytes,
+ #?STATE{msg_bytes_enqueue = Enqueue} = State)
+ when is_integer(Bytes) ->
+ State#?STATE{msg_bytes_enqueue = Enqueue + Bytes};
+add_bytes_enqueue(#{size := Bytes}, State) ->
+ add_bytes_enqueue(Bytes, State).
+
+add_bytes_drop(Bytes,
+ #?STATE{msg_bytes_enqueue = Enqueue} = State)
+ when is_integer(Bytes) ->
+ State#?STATE{msg_bytes_enqueue = Enqueue - Bytes};
+add_bytes_drop(#{size := Bytes}, State) ->
+ add_bytes_drop(Bytes, State).
+
+add_bytes_checkout(Bytes,
+ #?STATE{msg_bytes_checkout = Checkout,
+ msg_bytes_enqueue = Enqueue } = State)
+ when is_integer(Bytes) ->
+ State#?STATE{msg_bytes_checkout = Checkout + Bytes,
+ msg_bytes_enqueue = Enqueue - Bytes};
+add_bytes_checkout(#{size := Bytes}, State) ->
+ add_bytes_checkout(Bytes, State).
+
+add_bytes_settle(Bytes,
+ #?STATE{msg_bytes_checkout = Checkout} = State)
+ when is_integer(Bytes) ->
+ State#?STATE{msg_bytes_checkout = Checkout - Bytes};
+add_bytes_settle(#{size := Bytes}, State) ->
+ add_bytes_settle(Bytes, State).
+
+add_bytes_return(Bytes,
+ #?STATE{msg_bytes_checkout = Checkout,
+ msg_bytes_enqueue = Enqueue} = State)
+ when is_integer(Bytes) ->
+ State#?STATE{msg_bytes_checkout = Checkout - Bytes,
+ msg_bytes_enqueue = Enqueue + Bytes};
+add_bytes_return(#{size := Bytes}, State) ->
+ add_bytes_return(Bytes, State).
+
+add_in_memory_counts(Bytes,
+ #?STATE{msg_bytes_in_memory = InMemoryBytes,
+ msgs_ready_in_memory = InMemoryCount} = State)
+ when is_integer(Bytes) ->
+ State#?STATE{msg_bytes_in_memory = InMemoryBytes + Bytes,
+ msgs_ready_in_memory = InMemoryCount + 1};
+add_in_memory_counts(#{size := Bytes}, State) ->
+ add_in_memory_counts(Bytes, State).
+
+subtract_in_memory_counts(Bytes,
+ #?STATE{msg_bytes_in_memory = InMemoryBytes,
+ msgs_ready_in_memory = InMemoryCount} = State)
+ when is_integer(Bytes) ->
+ State#?STATE{msg_bytes_in_memory = InMemoryBytes - Bytes,
+ msgs_ready_in_memory = InMemoryCount - 1};
+subtract_in_memory_counts(#{size := Bytes}, State) ->
+ subtract_in_memory_counts(Bytes, State).
+
+message_size(#basic_message{content = Content}) ->
+ #content{payload_fragments_rev = PFR} = Content,
+ iolist_size(PFR);
+message_size({'$prefix_msg', H}) ->
+ get_size_from_header(H);
+message_size({'$empty_msg', H}) ->
+ get_size_from_header(H);
+message_size(B) when is_binary(B) ->
+ byte_size(B);
+message_size(Msg) ->
+ %% probably only hit this for testing so ok to use erts_debug
+ erts_debug:size(Msg).
+
+get_size_from_header(Size) when is_integer(Size) ->
+ Size;
+get_size_from_header(#{size := B}) ->
+ B.
+
+
+all_nodes(#?STATE{consumers = Cons0,
+ enqueuers = Enqs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Nodes0 = maps:fold(fun({_, P}, _, Acc) ->
+ Acc#{node(P) => ok}
+ end, #{}, Cons0),
+ Nodes1 = maps:fold(fun(P, _, Acc) ->
+ Acc#{node(P) => ok}
+ end, Nodes0, Enqs0),
+ maps:keys(
+ lists:foldl(fun({{_, P}, _}, Acc) ->
+ Acc#{node(P) => ok}
+ end, Nodes1, WaitingConsumers0)).
+
+all_pids_for(Node, #?STATE{consumers = Cons0,
+ enqueuers = Enqs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Cons = maps:fold(fun({_, P}, _, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, [], Cons0),
+ Enqs = maps:fold(fun(P, _, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, Cons, Enqs0),
+ lists:foldl(fun({{_, P}, _}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, Acc) -> Acc
+ end, Enqs, WaitingConsumers0).
+
+suspected_pids_for(Node, #?STATE{consumers = Cons0,
+ enqueuers = Enqs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, [], Cons0),
+ Enqs = maps:fold(fun(P, #enqueuer{status = suspected_down}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, Cons, Enqs0),
+ lists:foldl(fun({{_, P},
+ #consumer{status = suspected_down}}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, Acc) -> Acc
+ end, Enqs, WaitingConsumers0).
diff --git a/src/rabbit_fifo_v0.hrl b/src/rabbit_fifo_v0.hrl
new file mode 100644
index 0000000000..333ccb4d77
--- /dev/null
+++ b/src/rabbit_fifo_v0.hrl
@@ -0,0 +1,195 @@
+
+-type option(T) :: undefined | T.
+
+-type raw_msg() :: term().
+%% The raw message. It is opaque to rabbit_fifo.
+
+-type msg_in_id() :: non_neg_integer().
+% a queue scoped monotonically incrementing integer used to enforce order
+% in the unassigned messages map
+
+-type msg_id() :: non_neg_integer().
+%% A consumer-scoped monotonically incrementing integer included with a
+%% {@link delivery/0.}. Used to settle deliveries using
+%% {@link rabbit_fifo_client:settle/3.}
+
+-type msg_seqno() :: non_neg_integer().
+%% A sender process scoped monotonically incrementing integer included
+%% in enqueue messages. Used to ensure ordering of messages send from the
+%% same process
+
+-type msg_header() :: msg_size() |
+ #{size := msg_size(),
+ delivery_count => non_neg_integer()}.
+%% The message header:
+%% delivery_count: the number of unsuccessful delivery attempts.
+%% A non-zero value indicates a previous attempt.
+%% If it only contains the size it can be condensed to an integer only
+
+-type msg() :: {msg_header(), raw_msg()}.
+%% message with a header map.
+
+-type msg_size() :: non_neg_integer().
+%% the size in bytes of the msg payload
+
+-type indexed_msg() :: {ra:index(), msg()}.
+
+-type prefix_msg() :: {'$prefix_msg', msg_header()}.
+
+-type delivery_msg() :: {msg_id(), msg()}.
+%% A tuple consisting of the message id and the headered message.
+
+-type consumer_tag() :: binary().
+%% An arbitrary binary tag used to distinguish between different consumers
+%% set up by the same process. See: {@link rabbit_fifo_client:checkout/3.}
+
+-type delivery() :: {delivery, consumer_tag(), [delivery_msg()]}.
+%% Represents the delivery of one or more rabbit_fifo messages.
+
+-type consumer_id() :: {consumer_tag(), pid()}.
+%% The entity that receives messages. Uniquely identifies a consumer.
+
+-type credit_mode() :: simple_prefetch | credited.
+%% determines how credit is replenished
+
+-type checkout_spec() :: {once | auto, Num :: non_neg_integer(),
+ credit_mode()} |
+ {dequeue, settled | unsettled} |
+ cancel.
+
+-type consumer_meta() :: #{ack => boolean(),
+ username => binary(),
+ prefetch => non_neg_integer(),
+ args => list()}.
+%% static meta data associated with a consumer
+
+
+-type applied_mfa() :: {module(), atom(), list()}.
+% represents a partially applied module call
+
+-define(RELEASE_CURSOR_EVERY, 64000).
+-define(RELEASE_CURSOR_EVERY_MAX, 3200000).
+-define(USE_AVG_HALF_LIFE, 10000.0).
+%% an average QQ without any message uses about 100KB so setting this limit
+%% to ~10 times that should be relatively safe.
+-define(GC_MEM_LIMIT_B, 2000000).
+
+-define(MB, 1048576).
+-define(STATE, rabbit_fifo).
+
+-record(consumer,
+ {meta = #{} :: consumer_meta(),
+ checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}},
+ next_msg_id = 0 :: msg_id(), % part of snapshot data
+ %% max number of messages that can be sent
+ %% decremented for each delivery
+ credit = 0 : non_neg_integer(),
+ %% total number of checked out messages - ever
+ %% incremented for each delivery
+ delivery_count = 0 :: non_neg_integer(),
+ %% the mode of how credit is incremented
+ %% simple_prefetch: credit is re-filled as deliveries are settled
+ %% or returned.
+ %% credited: credit can only be changed by receiving a consumer_credit
+ %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
+ credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data
+ lifetime = once :: once | auto,
+ status = up :: up | suspected_down | cancelled
+ }).
+
+-type consumer() :: #consumer{}.
+
+-type consumer_strategy() :: competing | single_active.
+
+-record(enqueuer,
+ {next_seqno = 1 :: msg_seqno(),
+ % out of order enqueues - sorted list
+ pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}],
+ status = up :: up | suspected_down
+ }).
+
+-record(cfg,
+ {name :: atom(),
+ resource :: rabbit_types:r('queue'),
+ release_cursor_interval ::
+ undefined | non_neg_integer() |
+ {non_neg_integer(), non_neg_integer()},
+ dead_letter_handler :: option(applied_mfa()),
+ become_leader_handler :: option(applied_mfa()),
+ max_length :: option(non_neg_integer()),
+ max_bytes :: option(non_neg_integer()),
+ %% whether single active consumer is on or not for this queue
+ consumer_strategy = competing :: consumer_strategy(),
+ %% the maximum number of unsuccessful delivery attempts permitted
+ delivery_limit :: option(non_neg_integer()),
+ max_in_memory_length :: option(non_neg_integer()),
+ max_in_memory_bytes :: option(non_neg_integer())
+ }).
+
+-type prefix_msgs() :: {list(), list()} |
+ {non_neg_integer(), list(),
+ non_neg_integer(), list()}.
+
+-record(?STATE,
+ {cfg :: #cfg{},
+ % unassigned messages
+ messages = #{} :: #{msg_in_id() => indexed_msg()},
+ % defines the lowest message in id available in the messages map
+ % that isn't a return
+ low_msg_num :: option(msg_in_id()),
+ % defines the next message in id to be added to the messages map
+ next_msg_num = 1 :: msg_in_id(),
+ % list of returned msg_in_ids - when checking out it picks from
+ % this list first before taking low_msg_num
+ returns = lqueue:new() :: lqueue:lqueue(prefix_msg() |
+ {msg_in_id(), indexed_msg()}),
+ % a counter of enqueues - used to trigger shadow copy points
+ enqueue_count = 0 :: non_neg_integer(),
+ % a map containing all the live processes that have ever enqueued
+ % a message to this queue as well as a cached value of the smallest
+ % ra_index of all pending enqueues
+ enqueuers = #{} :: #{pid() => #enqueuer{}},
+ % master index of all enqueue raft indexes including pending
+ % enqueues
+ % rabbit_fifo_index can be slow when calculating the smallest
+ % index when there are large gaps but should be faster than gb_trees
+ % for normal appending operations as it's backed by a map
+ ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
+ release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor,
+ ra:index(), #?STATE{}}),
+ % consumers need to reflect consumer state at time of snapshot
+ % needs to be part of snapshot
+ consumers = #{} :: #{consumer_id() => #consumer{}},
+ % consumers that require further service are queued here
+ % needs to be part of snapshot
+ service_queue = queue:new() :: queue:queue(consumer_id()),
+ %% This is a special field that is only used for snapshots
+ %% It represents the queued messages at the time the
+ %% dehydrated snapshot state was cached.
+ %% As release_cursors are only emitted for raft indexes where all
+ %% prior messages no longer contribute to the current state we can
+ %% replace all message payloads with their sizes (to be used for
+ %% overflow calculations).
+ %% This is done so that consumers are still served in a deterministic
+ %% order on recovery.
+ prefix_msgs = {0, [], 0, []} :: prefix_msgs(),
+ msg_bytes_enqueue = 0 :: non_neg_integer(),
+ msg_bytes_checkout = 0 :: non_neg_integer(),
+ %% waiting consumers, one is picked active consumer is cancelled or dies
+ %% used only when single active consumer is on
+ waiting_consumers = [] :: [{consumer_id(), consumer()}],
+ msg_bytes_in_memory = 0 :: non_neg_integer(),
+ msgs_ready_in_memory = 0 :: non_neg_integer()
+ }).
+
+-type config() :: #{name := atom(),
+ queue_resource := rabbit_types:r('queue'),
+ dead_letter_handler => applied_mfa(),
+ become_leader_handler => applied_mfa(),
+ release_cursor_interval => non_neg_integer(),
+ max_length => non_neg_integer(),
+ max_bytes => non_neg_integer(),
+ max_in_memory_length => non_neg_integer(),
+ max_in_memory_bytes => non_neg_integer(),
+ single_active_consumer_on => boolean(),
+ delivery_limit => non_neg_integer()}.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 915cf9d527..327fc11a5e 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -19,7 +19,7 @@
-export([cluster_state/1, status/2]).
-export([update_consumer_handler/8, update_consumer/9]).
-export([cancel_consumer_handler/2, cancel_consumer/3]).
--export([become_leader/2, handle_tick/3]).
+-export([become_leader/2, handle_tick/3, spawn_deleter/1]).
-export([rpc_delete_metrics/1]).
-export([format/1]).
-export([open_files/1]).
@@ -102,6 +102,7 @@ handle_event({ra_event, From, Evt}, QState) ->
{new | existing, amqqueue:amqqueue()} | rabbit_types:channel_exit().
declare(Q) when ?amqqueue_is_quorum(Q) ->
+ rabbit_log:info("quorum_queue declaring ~w", [Q]),
QName = amqqueue:get_name(Q),
Durable = amqqueue:is_durable(Q),
AutoDelete = amqqueue:is_auto_delete(Q),
@@ -125,6 +126,12 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
|| ServerId <- members(NewQ)],
case ra:start_cluster(RaConfs) of
{ok, _, _} ->
+ %% TODO: handle error - what should be done if the
+ %% config cannot be updated
+ ok = rabbit_fifo_client:update_machine_state(Id,
+ ra_machine_config(NewQ)),
+ %% force a policy change to ensure the latest config is
+ %% updated even when running the machine version from 0
rabbit_event:notify(queue_created,
[{name, QName},
{durable, Durable},
@@ -152,10 +159,15 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
{Name, _} = amqqueue:get_pid(Q),
%% take the minimum value of the policy and the queue arg if present
MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
+ %% prefer the policy defined strategy if available
+ Overflow = args_policy_lookup(<<"overflow">>, fun (A, _B) -> A end , Q),
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q),
MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q),
DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q),
+ Expires = args_policy_lookup(<<"expires">>,
+ fun (A, _B) -> A end,
+ Q),
#{name => Name,
queue_resource => QName,
dead_letter_handler => dlx_mfa(Q),
@@ -165,7 +177,10 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
max_in_memory_length => MaxMemoryLength,
max_in_memory_bytes => MaxMemoryBytes,
single_active_consumer_on => single_active_consumer_on(Q),
- delivery_limit => DeliveryLimit
+ delivery_limit => DeliveryLimit,
+ overflow_strategy => overflow(Overflow, drop_head),
+ created => erlang:system_time(millisecond),
+ expires => Expires
}.
single_active_consumer_on(Q) ->
@@ -292,8 +307,14 @@ filter_quorum_critical(Queues, ReplicaStates) ->
-spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean().
is_policy_applicable(_Q, Policy) ->
- Applicable = [<<"max-length">>, <<"max-length-bytes">>, <<"max-in-memory-length">>,
- <<"max-in-memory-bytes">>, <<"delivery-limit">>, <<"dead-letter-exchange">>,
+ Applicable = [<<"max-length">>,
+ <<"max-length-bytes">>,
+ <<"overflow">>,
+ <<"expires">>,
+ <<"max-in-memory-length">>,
+ <<"max-in-memory-bytes">>,
+ <<"delivery-limit">>,
+ <<"dead-letter-exchange">>,
<<"dead-letter-routing-key">>],
lists:all(fun({P, _}) ->
lists:member(P, Applicable)
@@ -304,6 +325,12 @@ rpc_delete_metrics(QName) ->
ets:delete(queue_metrics, QName),
ok.
+spawn_deleter(QName) ->
+ spawn(fun () ->
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ delete(Q, false, false, <<"expired">>)
+ end).
+
handle_tick(QName,
{Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack},
Nodes) ->
@@ -694,13 +721,29 @@ stateless_deliver(ServerId, Delivery) ->
-spec deliver(Confirm :: boolean(), rabbit_types:delivery(),
rabbit_fifo_client:state()) ->
- {ok | slow, rabbit_fifo_client:state()}.
+ {ok | slow, rabbit_fifo_client:state()} |
+ {reject_publish, rabbit_fifo_client:state()}.
deliver(false, Delivery, QState0) ->
- rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0);
+ case rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0) of
+ {ok, _} = Res -> Res;
+ {slow, _} = Res -> Res;
+ {reject_publish, State} ->
+ {ok, State}
+ end;
deliver(true, Delivery, QState0) ->
- rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no,
- Delivery#delivery.message, QState0).
+ Seq = Delivery#delivery.msg_seq_no,
+ case rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no,
+ Delivery#delivery.message, QState0) of
+ {ok, _} = Res -> Res;
+ {slow, _} = Res -> Res;
+ {reject_publish, State} ->
+ %% TODO: this works fine but once the queue types interface is in
+ %% place it could be replaced with an action or similar to avoid
+ %% self publishing messages.
+ gen_server2:cast(self(), {reject_publish, Seq, undefined}),
+ {ok, State}
+ end.
-spec info(amqqueue:amqqueue()) -> rabbit_types:infos().
@@ -783,9 +826,9 @@ maybe_delete_data_dir(UId) ->
ok
end.
-policy_changed(QName, Node) ->
+policy_changed(QName, Server) ->
{ok, Q} = rabbit_amqqueue:lookup(QName),
- rabbit_fifo_client:update_machine_state(Node, ra_machine_config(Q)).
+ rabbit_fifo_client:update_machine_state(Server, ra_machine_config(Q)).
-spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'.
@@ -1320,8 +1363,8 @@ maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
check_invalid_arguments(QueueName, Args) ->
- Keys = [<<"x-expires">>, <<"x-message-ttl">>,
- <<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>],
+ Keys = [<<"x-message-ttl">>,
+ <<"x-max-priority">>, <<"x-queue-mode">>],
[case rabbit_misc:table_lookup(Args, Key) of
undefined -> ok;
_TypeVal -> rabbit_misc:protocol_error(
@@ -1329,6 +1372,17 @@ check_invalid_arguments(QueueName, Args) ->
"invalid arg '~s' for ~s",
[Key, rabbit_misc:rs(QueueName)])
end || Key <- Keys],
+
+ case rabbit_misc:table_lookup(Args, <<"x-overflow">>) of
+ undefined -> ok;
+ {_, <<"reject-publish-dlx">>} ->
+ rabbit_misc:protocol_error(
+ precondition_failed,
+ "invalid arg 'x-overflow' with value 'reject-publish-dlx' for ~s",
+ [rabbit_misc:rs(QueueName)]);
+ _ ->
+ ok
+ end,
ok.
check_auto_delete(Q) when ?amqqueue_is_auto_delete(Q) ->
@@ -1413,3 +1467,7 @@ get_nodes(Q) when ?is_amqqueue(Q) ->
update_type_state(Q, Fun) when ?is_amqqueue(Q) ->
Ts = amqqueue:get_type_state(Q),
amqqueue:set_type_state(Q, Fun(Ts)).
+
+overflow(undefined, Def) -> Def;
+overflow(<<"reject-publish">>, _Def) -> reject_publish;
+overflow(<<"drop-head">>, _Def) -> drop_head.